diff --git a/CHANGELOG.md b/CHANGELOG.md index eddd7bea304f6..2b77193a7c73d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Bump `org.apache.commons:commons-configuration2` from 2.10.1 to 2.11.0 ([#14399](https://github.com/opensearch-project/OpenSearch/pull/14399)) - Bump `com.gradle.develocity` from 3.17.4 to 3.17.5 ([#14397](https://github.com/opensearch-project/OpenSearch/pull/14397)) - Bump `opentelemetry` from 1.36.0 to 1.39.0 ([#14457](https://github.com/opensearch-project/OpenSearch/pull/14457)) +- Bump `azure-identity` from 1.11.4 to 1.13.0, Bump `msal4j` from 1.14.3 to 1.15.1, Bump `msal4j-persistence-extension` from 1.2.0 to 1.3.0 ([#14506](https://github.com/opensearch-project/OpenSearch/pull/14506)) ### Changed - unsignedLongRangeQuery now returns MatchNoDocsQuery if the lower bounds are greater than the upper bounds ([#14416](https://github.com/opensearch-project/OpenSearch/pull/14416)) @@ -33,10 +34,14 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Removed ### Fixed +- Fix bug in SBP cancellation logic ([#13259](https://github.com/opensearch-project/OpenSearch/pull/13474)) - Fix handling of Short and Byte data types in ScriptProcessor ingest pipeline ([#14379](https://github.com/opensearch-project/OpenSearch/issues/14379)) - Switch to iterative version of WKT format parser ([#14086](https://github.com/opensearch-project/OpenSearch/pull/14086)) - Fix the computed max shards of cluster to avoid int overflow ([#14155](https://github.com/opensearch-project/OpenSearch/pull/14155)) - Fixed rest-high-level client searchTemplate & mtermVectors endpoints to have a leading slash ([#14465](https://github.com/opensearch-project/OpenSearch/pull/14465)) +- Write shard level metadata blob when snapshotting searchable snapshot indexes ([#13190](https://github.com/opensearch-project/OpenSearch/pull/13190)) +- Fix aggs result of NestedAggregator with sub NestedAggregator ([#13324](https://github.com/opensearch-project/OpenSearch/pull/13324)) +- Add ListPitInfo::getKeepAlive() getter ([#14495](https://github.com/opensearch-project/OpenSearch/pull/14495)) - Refactoring Grok.validatePatternBank by using an iterative approach ([#14206](https://github.com/opensearch-project/OpenSearch/pull/14206)) ### Security diff --git a/TESTING.md b/TESTING.md index 3928b2fc93afa..e8416f61be7e1 100644 --- a/TESTING.md +++ b/TESTING.md @@ -91,21 +91,23 @@ This will instruct all JVMs (including any that run cli tools such as creating t ## Test case filtering -- `tests.class` is a class-filtering shell-like glob pattern -- `tests.method` is a method-filtering glob pattern. +To be able to run a single test you need to specify the module where you're running the tests from. + +Example: `./gradlew server:test --tests "*.ReplicaShardBatchAllocatorTests.testNoAsyncFetchData"` Run a single test case (variants) - ./gradlew test -Dtests.class=org.opensearch.package.ClassName - ./gradlew test "-Dtests.class=*.ClassName" + ./gradlew module:test --tests org.opensearch.package.ClassName + ./gradlew module:test --tests org.opensearch.package.ClassName.testName + ./gradlew module:test --tests "*.ClassName" Run all tests in a package and its sub-packages - ./gradlew test "-Dtests.class=org.opensearch.package.*" + ./gradlew module:test --tests "org.opensearch.package.*" Run any test methods that contain *esi* (e.g.: .r*esi*ze.) - ./gradlew test "-Dtests.method=*esi*" + ./gradlew module:test --tests "*esi*" Run all tests that are waiting for a bugfix (disabled by default) diff --git a/plugins/repository-azure/build.gradle b/plugins/repository-azure/build.gradle index 0fd30af71dd0a..f3aa64316b667 100644 --- a/plugins/repository-azure/build.gradle +++ b/plugins/repository-azure/build.gradle @@ -57,11 +57,11 @@ dependencies { api "io.netty:netty-transport-native-unix-common:${versions.netty}" implementation project(':modules:transport-netty4') api 'com.azure:azure-storage-blob:12.23.0' - api 'com.azure:azure-identity:1.11.4' + api 'com.azure:azure-identity:1.13.0' // Start of transitive dependencies for azure-identity - api 'com.microsoft.azure:msal4j-persistence-extension:1.2.0' + api 'com.microsoft.azure:msal4j-persistence-extension:1.3.0' api "net.java.dev.jna:jna-platform:${versions.jna}" - api 'com.microsoft.azure:msal4j:1.14.3' + api 'com.microsoft.azure:msal4j:1.15.1' api 'com.nimbusds:oauth2-oidc-sdk:11.9.1' api 'com.nimbusds:nimbus-jose-jwt:9.40' api 'com.nimbusds:content-type:2.3' diff --git a/plugins/repository-azure/licenses/azure-identity-1.11.4.jar.sha1 b/plugins/repository-azure/licenses/azure-identity-1.11.4.jar.sha1 deleted file mode 100644 index c8d98ba9c8ad2..0000000000000 --- a/plugins/repository-azure/licenses/azure-identity-1.11.4.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -59b5ce48888f638b80d85ef5aa0e22a265d3dc89 \ No newline at end of file diff --git a/plugins/repository-azure/licenses/azure-identity-1.13.0.jar.sha1 b/plugins/repository-azure/licenses/azure-identity-1.13.0.jar.sha1 new file mode 100644 index 0000000000000..b59c2a3be5c92 --- /dev/null +++ b/plugins/repository-azure/licenses/azure-identity-1.13.0.jar.sha1 @@ -0,0 +1 @@ +54b44a74636322d06e9dc42d611a9f12a0966790 \ No newline at end of file diff --git a/plugins/repository-azure/licenses/msal4j-1.14.3.jar.sha1 b/plugins/repository-azure/licenses/msal4j-1.14.3.jar.sha1 deleted file mode 100644 index 2a6e42e3f2b48..0000000000000 --- a/plugins/repository-azure/licenses/msal4j-1.14.3.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -117b28c41bd760f979ed1b6467c5ec491f0d4d60 \ No newline at end of file diff --git a/plugins/repository-azure/licenses/msal4j-1.15.1.jar.sha1 b/plugins/repository-azure/licenses/msal4j-1.15.1.jar.sha1 new file mode 100644 index 0000000000000..797f21d0d4995 --- /dev/null +++ b/plugins/repository-azure/licenses/msal4j-1.15.1.jar.sha1 @@ -0,0 +1 @@ +cd1daa94b81bd97153536b661c31295f99cbb8e7 \ No newline at end of file diff --git a/plugins/repository-azure/licenses/msal4j-persistence-extension-1.2.0.jar.sha1 b/plugins/repository-azure/licenses/msal4j-persistence-extension-1.2.0.jar.sha1 deleted file mode 100644 index cfcf7548b7694..0000000000000 --- a/plugins/repository-azure/licenses/msal4j-persistence-extension-1.2.0.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -1111a95878de8745ddc9de132df18ebd9ca7024d \ No newline at end of file diff --git a/plugins/repository-azure/licenses/msal4j-persistence-extension-1.3.0.jar.sha1 b/plugins/repository-azure/licenses/msal4j-persistence-extension-1.3.0.jar.sha1 new file mode 100644 index 0000000000000..9c5909e7ff240 --- /dev/null +++ b/plugins/repository-azure/licenses/msal4j-persistence-extension-1.3.0.jar.sha1 @@ -0,0 +1 @@ +8a8ef1517d27a5b4de1512ef94679bdb59f210b6 \ No newline at end of file diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 70da3b0e38472..2421a1a507372 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -1405,7 +1405,7 @@ public void testPitCreatedOnReplica() throws Exception { .setPointInTime(new PointInTimeBuilder(pitResponse.getId()).setKeepAlive(TimeValue.timeValueDays(1))) .setRequestCache(false) .get(); - PitTestsUtil.assertUsingGetAllPits(client(replica), pitResponse.getId(), pitResponse.getCreationTime()); + PitTestsUtil.assertUsingGetAllPits(client(replica), pitResponse.getId(), pitResponse.getCreationTime(), TimeValue.timeValueDays(1)); assertSegments(false, INDEX_NAME, 1, client(replica), pitResponse.getId()); List currentFiles = List.of(replicaShard.store().directory().listAll()); diff --git a/server/src/internalClusterTest/java/org/opensearch/search/pit/PitMultiNodeIT.java b/server/src/internalClusterTest/java/org/opensearch/search/pit/PitMultiNodeIT.java index 8bea5ef97fbba..faec3977f94ef 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/pit/PitMultiNodeIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/pit/PitMultiNodeIT.java @@ -104,7 +104,7 @@ public void testPit() throws Exception { assertEquals(2, searchResponse.getSuccessfulShards()); assertEquals(2, searchResponse.getTotalShards()); validatePitStats("index", 2, 2); - PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); + PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime(), TimeValue.timeValueDays(1)); assertSegments(false, client(), pitResponse.getId()); } @@ -131,7 +131,12 @@ public void testCreatePitWhileNodeDropWithAllowPartialCreationTrue() throws Exce public Settings onNodeStopped(String nodeName) throws Exception { ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); CreatePitResponse pitResponse = execute.get(); - PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); + PitTestsUtil.assertUsingGetAllPits( + client(), + pitResponse.getId(), + pitResponse.getCreationTime(), + TimeValue.timeValueDays(1) + ); assertSegments(false, "index", 1, client(), pitResponse.getId()); assertEquals(1, pitResponse.getSuccessfulShards()); assertEquals(2, pitResponse.getTotalShards()); @@ -164,7 +169,12 @@ public Settings onNodeStopped(String nodeName) throws Exception { assertEquals(0, searchResponse.getSkippedShards()); assertEquals(2, searchResponse.getTotalShards()); validatePitStats("index", 1, 1); - PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); + PitTestsUtil.assertUsingGetAllPits( + client(), + pitResponse.getId(), + pitResponse.getCreationTime(), + TimeValue.timeValueDays(1) + ); return super.onNodeStopped(nodeName); } }); diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java index b41dd99ff6d40..2440a3c64e956 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java @@ -17,6 +17,7 @@ import org.opensearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest; import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse; import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest; +import org.opensearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse; import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest; import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse; import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequestBuilder; @@ -53,11 +54,13 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -132,21 +135,24 @@ public void testCreateSearchableSnapshot() throws Exception { public void testSnapshottingSearchableSnapshots() throws Exception { final String repoName = "test-repo"; + final String initSnapName = "initial-snapshot"; final String indexName = "test-idx"; + final String repeatSnapNamePrefix = "test-repeated-snap-"; + final String repeatIndexNamePrefix = indexName + "-copy-"; final Client client = client(); // create an index, add data, snapshot it, then delete it internalCluster().ensureAtLeastNumDataNodes(1); createIndexWithDocsAndEnsureGreen(0, 100, indexName); createRepositoryWithSettings(null, repoName); - takeSnapshot(client, "initial-snapshot", repoName, indexName); + takeSnapshot(client, initSnapName, repoName, indexName); deleteIndicesAndEnsureGreen(client, indexName); // restore the index as a searchable snapshot internalCluster().ensureAtLeastNumSearchNodes(1); client.admin() .cluster() - .prepareRestoreSnapshot(repoName, "initial-snapshot") + .prepareRestoreSnapshot(repoName, initSnapName) .setRenamePattern("(.+)") .setRenameReplacement("$1-copy-0") .setStorageType(RestoreSnapshotRequest.StorageType.REMOTE_SNAPSHOT) @@ -159,7 +165,7 @@ public void testSnapshottingSearchableSnapshots() throws Exception { // Test that the searchable snapshot index can continue to be snapshotted and restored for (int i = 0; i < 4; i++) { - final String repeatedSnapshotName = "test-repeated-snap-" + i; + final String repeatedSnapshotName = repeatSnapNamePrefix + i; takeSnapshot(client, repeatedSnapshotName, repoName); deleteIndicesAndEnsureGreen(client, "_all"); client.admin() @@ -181,21 +187,34 @@ public void testSnapshottingSearchableSnapshots() throws Exception { final Map> snapshotInfoMap = response.getSnapshots() .stream() .collect(Collectors.toMap(s -> s.snapshotId().getName(), SnapshotInfo::indices)); - assertEquals( - Map.of( - "initial-snapshot", - List.of("test-idx"), - "test-repeated-snap-0", - List.of("test-idx-copy-0"), - "test-repeated-snap-1", - List.of("test-idx-copy-1"), - "test-repeated-snap-2", - List.of("test-idx-copy-2"), - "test-repeated-snap-3", - List.of("test-idx-copy-3") - ), - snapshotInfoMap - ); + final Map> expect = new HashMap<>(); + expect.put(initSnapName, List.of(indexName)); + IntStream.range(0, 4).forEach(i -> expect.put(repeatSnapNamePrefix + i, List.of(repeatIndexNamePrefix + i))); + assertEquals(expect, snapshotInfoMap); + + String[] snapNames = new String[5]; + IntStream.range(0, 4).forEach(i -> snapNames[i] = repeatSnapNamePrefix + i); + snapNames[4] = initSnapName; + SnapshotsStatusResponse snapshotsStatusResponse = client.admin() + .cluster() + .prepareSnapshotStatus(repoName) + .addSnapshots(snapNames) + .execute() + .actionGet(); + snapshotsStatusResponse.getSnapshots().forEach(s -> { + String snapName = s.getSnapshot().getSnapshotId().getName(); + assertEquals(1, s.getIndices().size()); + assertEquals(1, s.getShards().size()); + if (snapName.equals("initial-snapshot")) { + assertNotNull(s.getIndices().get("test-idx")); + assertTrue(s.getShards().get(0).getStats().getTotalFileCount() > 0); + } else { + assertTrue(snapName.startsWith(repeatSnapNamePrefix)); + assertEquals(1, s.getIndices().size()); + assertNotNull(s.getIndices().get(repeatIndexNamePrefix + snapName.substring(repeatSnapNamePrefix.length()))); + assertEquals(0L, s.getShards().get(0).getStats().getTotalFileCount()); + } + }); } /** diff --git a/server/src/main/java/org/opensearch/action/search/ListPitInfo.java b/server/src/main/java/org/opensearch/action/search/ListPitInfo.java index 7e4ed186dd665..ac321d961679a 100644 --- a/server/src/main/java/org/opensearch/action/search/ListPitInfo.java +++ b/server/src/main/java/org/opensearch/action/search/ListPitInfo.java @@ -53,6 +53,10 @@ public long getCreationTime() { return creationTime; } + public long getKeepAlive() { + return keepAlive; + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(pitId); diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index c41e97d278dd5..53c44f743c781 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -2801,9 +2801,12 @@ public void snapshotShard( long indexIncrementalSize = 0; long indexTotalFileSize = 0; final BlockingQueue filesToSnapshot = new LinkedBlockingQueue<>(); - // If we did not find a set of files that is equal to the current commit we determine the files to upload by comparing files - // in the commit with files already in the repository - if (filesFromSegmentInfos == null) { + if (store.indexSettings().isRemoteSnapshot()) { + // If the source of the data is another remote snapshot (i.e. searchable snapshot) then no need to snapshot the shard + indexCommitPointFiles = List.of(); + } else if (filesFromSegmentInfos == null) { + // If we did not find a set of files that is equal to the current commit we determine the files to upload by comparing files + // in the commit with files already in the repository indexCommitPointFiles = new ArrayList<>(); final Collection fileNames; final Store.MetadataSnapshot metadataFromStore; diff --git a/server/src/main/java/org/opensearch/search/ResourceType.java b/server/src/main/java/org/opensearch/search/ResourceType.java new file mode 100644 index 0000000000000..5bbcd7de1c2ce --- /dev/null +++ b/server/src/main/java/org/opensearch/search/ResourceType.java @@ -0,0 +1,41 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search; + +/** + * Enum to hold the resource type + */ +public enum ResourceType { + CPU("cpu"), + JVM("jvm"); + + private final String name; + + ResourceType(String name) { + this.name = name; + } + + /** + * The string match here is case-sensitive + * @param s name matching the resource type name + * @return a {@link ResourceType} + */ + public static ResourceType fromName(String s) { + for (ResourceType resourceType : values()) { + if (resourceType.getName().equals(s)) { + return resourceType; + } + } + throw new IllegalArgumentException("Unknown resource type: [" + s + "]"); + } + + private String getName() { + return name; + } +} diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/nested/NestedAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/nested/NestedAggregator.java index cfa1d32a52501..150efa878f866 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/nested/NestedAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/nested/NestedAggregator.java @@ -43,8 +43,10 @@ import org.apache.lucene.search.Weight; import org.apache.lucene.search.join.BitSetProducer; import org.apache.lucene.util.BitSet; +import org.opensearch.common.collect.Tuple; import org.opensearch.common.lucene.search.Queries; import org.opensearch.core.ParseField; +import org.opensearch.index.mapper.MapperService; import org.opensearch.index.mapper.ObjectMapper; import org.opensearch.search.aggregations.Aggregator; import org.opensearch.search.aggregations.AggregatorFactories; @@ -88,12 +90,25 @@ public class NestedAggregator extends BucketsAggregator implements SingleBucketA ) throws IOException { super(name, factories, context, parent, cardinality, metadata); - Query parentFilter = parentObjectMapper != null ? parentObjectMapper.nestedTypeFilter() : Queries.newNonNestedFilter(); + Query parentFilter = isParent(parentObjectMapper, childObjectMapper, context.mapperService()) + ? parentObjectMapper.nestedTypeFilter() + : Queries.newNonNestedFilter(); this.parentFilter = context.bitsetFilterCache().getBitSetProducer(parentFilter); this.childFilter = childObjectMapper.nestedTypeFilter(); this.collectsFromSingleBucket = cardinality.map(estimate -> estimate < 2); } + private boolean isParent(ObjectMapper parentObjectMapper, ObjectMapper childObjectMapper, MapperService mapperService) { + if (parentObjectMapper == null) { + return false; + } + ObjectMapper parent; + do { + parent = childObjectMapper.getParentObjectMapper(mapperService); + } while (parent != null && parent != parentObjectMapper); + return parentObjectMapper == parent; + } + @Override public LeafBucketCollector getLeafCollector(final LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException { IndexReaderContext topLevelContext = ReaderUtil.getTopLevelContext(ctx); @@ -107,20 +122,17 @@ public LeafBucketCollector getLeafCollector(final LeafReaderContext ctx, final L if (collectsFromSingleBucket) { return new LeafBucketCollectorBase(sub, null) { @Override - public void collect(int parentDoc, long bucket) throws IOException { - // if parentDoc is 0 then this means that this parent doesn't have child docs (b/c these appear always before the parent - // doc), so we can skip: - if (parentDoc == 0 || parentDocs == null || childDocs == null) { + public void collect(int parentAggDoc, long bucket) throws IOException { + // parentAggDoc can be 0 when aggregation: + if (parentDocs == null || childDocs == null) { return; } - final int prevParentDoc = parentDocs.prevSetBit(parentDoc - 1); - int childDocId = childDocs.docID(); - if (childDocId <= prevParentDoc) { - childDocId = childDocs.advance(prevParentDoc + 1); - } + Tuple res = getParentAndChildId(parentDocs, childDocs, parentAggDoc); + int currentParentDoc = res.v1(); + int childDocId = res.v2(); - for (; childDocId < parentDoc; childDocId = childDocs.nextDoc()) { + for (; childDocId < currentParentDoc; childDocId = childDocs.nextDoc()) { collectBucket(sub, childDocId, bucket); } } @@ -130,6 +142,43 @@ public void collect(int parentDoc, long bucket) throws IOException { } } + /** + * In one case, it's talking about the parent doc (from the Lucene block-join standpoint), + * while in the other case, it's talking about a child doc ID (from the block-join standpoint) + * from the parent aggregation, where we're trying to aggregate over a sibling of that child. + * So, we need to map from that document to its parent, then join to the appropriate sibling. + * + * @param parentAggDoc the parent aggregation's current doc + * (which may or may not be a block-level parent doc) + * @return a tuple consisting of the current block-level parent doc (the parent of the + * parameter doc), and the next matching child doc (hopefully under this parent) + * for the aggregation (according to the child doc iterator). + */ + static Tuple getParentAndChildId(BitSet parentDocs, DocIdSetIterator childDocs, int parentAggDoc) throws IOException { + int currentParentAggDoc; + int prevParentDoc = parentDocs.prevSetBit(parentAggDoc); + if (prevParentDoc == -1) { + currentParentAggDoc = parentDocs.nextSetBit(0); + } else if (prevParentDoc == parentAggDoc) { + // parentAggDoc is the parent of that child, and is belongs to parentDocs + currentParentAggDoc = parentAggDoc; + if (currentParentAggDoc == 0) { + prevParentDoc = -1; + } else { + prevParentDoc = parentDocs.prevSetBit(currentParentAggDoc - 1); + } + } else { + // parentAggDoc is the sibling of that child, and it means the block-join parent + currentParentAggDoc = parentDocs.nextSetBit(prevParentDoc + 1); + } + + int childDocId = childDocs.docID(); + if (childDocId <= prevParentDoc) { + childDocId = childDocs.advance(prevParentDoc + 1); + } + return Tuple.tuple(currentParentAggDoc, childDocId); + } + @Override protected void preGetSubLeafCollectors(LeafReaderContext ctx) throws IOException { super.preGetSubLeafCollectors(ctx); @@ -191,9 +240,8 @@ public void setScorer(Scorable scorer) throws IOException { @Override public void collect(int parentDoc, long bucket) throws IOException { - // if parentDoc is 0 then this means that this parent doesn't have child docs (b/c these appear always before the parent - // doc), so we can skip: - if (parentDoc == 0 || parentDocs == null || childDocs == null) { + // parentAggDoc can be 0 when aggregation: + if (parentDocs == null || childDocs == null) { return; } @@ -214,11 +262,9 @@ void processBufferedChildBuckets() throws IOException { return; } - final int prevParentDoc = parentDocs.prevSetBit(currentParentDoc - 1); - int childDocId = childDocs.docID(); - if (childDocId <= prevParentDoc) { - childDocId = childDocs.advance(prevParentDoc + 1); - } + Tuple res = getParentAndChildId(parentDocs, childDocs, currentParentDoc); + int currentParentDoc = res.v1(); + int childDocId = res.v2(); for (; childDocId < currentParentDoc; childDocId = childDocs.nextDoc()) { cachedScorer.doc = childDocId; diff --git a/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java b/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java index ebf9623eb367a..3e8ed3070e4ef 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java +++ b/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java @@ -18,6 +18,7 @@ import org.opensearch.common.settings.Setting; import org.opensearch.monitor.jvm.JvmStats; import org.opensearch.monitor.process.ProcessProbe; +import org.opensearch.search.ResourceType; import org.opensearch.search.backpressure.settings.SearchBackpressureMode; import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; import org.opensearch.search.backpressure.settings.SearchShardTaskSettings; @@ -28,9 +29,11 @@ import org.opensearch.search.backpressure.trackers.CpuUsageTracker; import org.opensearch.search.backpressure.trackers.ElapsedTimeTracker; import org.opensearch.search.backpressure.trackers.HeapUsageTracker; -import org.opensearch.search.backpressure.trackers.NodeDuressTracker; -import org.opensearch.search.backpressure.trackers.TaskResourceUsageTracker; +import org.opensearch.search.backpressure.trackers.NodeDuressTrackers; +import org.opensearch.search.backpressure.trackers.NodeDuressTrackers.NodeDuressTracker; import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType; +import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers; +import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers.TaskResourceUsageTracker; import org.opensearch.tasks.CancellableTask; import org.opensearch.tasks.SearchBackpressureTask; import org.opensearch.tasks.Task; @@ -44,11 +47,13 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; +import java.util.EnumMap; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.function.DoubleSupplier; +import java.util.function.Function; import java.util.function.LongSupplier; import java.util.stream.Collectors; @@ -62,7 +67,14 @@ */ public class SearchBackpressureService extends AbstractLifecycleComponent implements TaskCompletionListener { private static final Logger logger = LogManager.getLogger(SearchBackpressureService.class); - + private static final Map> trackerApplyConditions = Map.of( + TaskResourceUsageTrackerType.CPU_USAGE_TRACKER, + (nodeDuressTrackers) -> nodeDuressTrackers.isResourceInDuress(ResourceType.CPU), + TaskResourceUsageTrackerType.HEAP_USAGE_TRACKER, + (nodeDuressTrackers) -> isHeapTrackingSupported() && nodeDuressTrackers.isResourceInDuress(ResourceType.JVM), + TaskResourceUsageTrackerType.ELAPSED_TIME_TRACKER, + (nodeDuressTrackers) -> true + ); private volatile Scheduler.Cancellable scheduledFuture; private final SearchBackpressureSettings settings; @@ -70,8 +82,8 @@ public class SearchBackpressureService extends AbstractLifecycleComponent implem private final ThreadPool threadPool; private final LongSupplier timeNanosSupplier; - private final List nodeDuressTrackers; - private final Map, List> taskTrackers; + private final NodeDuressTrackers nodeDuressTrackers; + private final Map, TaskResourceUsageTrackers> taskTrackers; private final Map, SearchBackpressureState> searchBackpressureStates; private final TaskManager taskManager; @@ -82,19 +94,26 @@ public SearchBackpressureService( ThreadPool threadPool, TaskManager taskManager ) { - this( - settings, - taskResourceTrackingService, - threadPool, - System::nanoTime, - List.of( - new NodeDuressTracker( - () -> ProcessProbe.getInstance().getProcessCpuPercent() / 100.0 >= settings.getNodeDuressSettings().getCpuThreshold() - ), - new NodeDuressTracker( - () -> JvmStats.jvmStats().getMem().getHeapUsedPercent() / 100.0 >= settings.getNodeDuressSettings().getHeapThreshold() - ) - ), + this(settings, taskResourceTrackingService, threadPool, System::nanoTime, new NodeDuressTrackers(new EnumMap<>(ResourceType.class) { + { + put( + ResourceType.CPU, + new NodeDuressTracker( + () -> ProcessProbe.getInstance().getProcessCpuPercent() / 100.0 >= settings.getNodeDuressSettings() + .getCpuThreshold(), + () -> settings.getNodeDuressSettings().getNumSuccessiveBreaches() + ) + ); + put( + ResourceType.JVM, + new NodeDuressTracker( + () -> JvmStats.jvmStats().getMem().getHeapUsedPercent() / 100.0 >= settings.getNodeDuressSettings() + .getHeapThreshold(), + () -> settings.getNodeDuressSettings().getNumSuccessiveBreaches() + ) + ); + } + }), getTrackers( settings.getSearchTaskSettings()::getCpuTimeNanosThreshold, settings.getSearchTaskSettings()::getHeapVarianceThreshold, @@ -117,14 +136,14 @@ public SearchBackpressureService( ); } - public SearchBackpressureService( + SearchBackpressureService( SearchBackpressureSettings settings, TaskResourceTrackingService taskResourceTrackingService, ThreadPool threadPool, LongSupplier timeNanosSupplier, - List nodeDuressTrackers, - List searchTaskTrackers, - List searchShardTaskTrackers, + NodeDuressTrackers nodeDuressTrackers, + TaskResourceUsageTrackers searchTaskTrackers, + TaskResourceUsageTrackers searchShardTaskTrackers, TaskManager taskManager ) { this.settings = settings; @@ -163,40 +182,48 @@ void doRun() { return; } - if (isNodeInDuress() == false) { + if (nodeDuressTrackers.isNodeInDuress() == false) { return; } List searchTasks = getTaskByType(SearchTask.class); List searchShardTasks = getTaskByType(SearchShardTask.class); - List cancellableTasks = new ArrayList<>(); + + boolean isHeapUsageDominatedBySearchTasks = isHeapUsageDominatedBySearch( + searchTasks, + getSettings().getSearchTaskSettings().getTotalHeapPercentThreshold() + ); + boolean isHeapUsageDominatedBySearchShardTasks = isHeapUsageDominatedBySearch( + searchShardTasks, + getSettings().getSearchShardTaskSettings().getTotalHeapPercentThreshold() + ); + final Map, List> cancellableTasks = Map.of( + SearchTask.class, + isHeapUsageDominatedBySearchTasks ? searchTasks : Collections.emptyList(), + SearchShardTask.class, + isHeapUsageDominatedBySearchShardTasks ? searchShardTasks : Collections.emptyList() + ); // Force-refresh usage stats of these tasks before making a cancellation decision. taskResourceTrackingService.refreshResourceStats(searchTasks.toArray(new Task[0])); taskResourceTrackingService.refreshResourceStats(searchShardTasks.toArray(new Task[0])); - // Check if increase in heap usage is due to SearchTasks - if (HeapUsageTracker.isHeapUsageDominatedBySearch( - searchTasks, - getSettings().getSearchTaskSettings().getTotalHeapPercentThreshold() - )) { - cancellableTasks.addAll(searchTasks); - } + List taskCancellations = new ArrayList<>(); - // Check if increase in heap usage is due to SearchShardTasks - if (HeapUsageTracker.isHeapUsageDominatedBySearch( - searchShardTasks, - getSettings().getSearchShardTaskSettings().getTotalHeapPercentThreshold() - )) { - cancellableTasks.addAll(searchShardTasks); + for (TaskResourceUsageTrackerType trackerType : TaskResourceUsageTrackerType.values()) { + if (shouldApply(trackerType)) { + addResourceTrackerBasedCancellations(trackerType, taskCancellations, cancellableTasks); + } } - // none of the task type is breaching the heap usage thresholds and hence we do not cancel any tasks - if (cancellableTasks.isEmpty()) { - return; - } + // Since these cancellations might be duplicate due to multiple trackers causing cancellation for same task + // We need to merge them + taskCancellations = mergeTaskCancellations(taskCancellations).stream() + .map(this::addSBPStateUpdateCallback) + .filter(TaskCancellation::isEligibleForCancellation) + .collect(Collectors.toList()); - for (TaskCancellation taskCancellation : getTaskCancellations(cancellableTasks)) { + for (TaskCancellation taskCancellation : taskCancellations) { logger.warn( "[{} mode] cancelling task [{}] due to high resource consumption [{}]", mode.getName(), @@ -226,6 +253,66 @@ void doRun() { } } + /** + * Had to define this method to help mock this static method to test the scenario where SearchTraffic should not be + * penalised when not breaching the threshold + * @param searchTasks inFlight co-ordinator requests + * @param threshold miniumum jvm allocated bytes ratio w.r.t. available heap + * @return a boolean value based on whether the threshold is breached + */ + boolean isHeapUsageDominatedBySearch(List searchTasks, double threshold) { + return HeapUsageTracker.isHeapUsageDominatedBySearch(searchTasks, threshold); + } + + private TaskCancellation addSBPStateUpdateCallback(TaskCancellation taskCancellation) { + CancellableTask task = taskCancellation.getTask(); + Runnable toAddCancellationCallbackForSBPState = searchBackpressureStates.get(SearchShardTask.class)::incrementCancellationCount; + if (task instanceof SearchTask) { + toAddCancellationCallbackForSBPState = searchBackpressureStates.get(SearchTask.class)::incrementCancellationCount; + } + List newOnCancelCallbacks = new ArrayList<>(taskCancellation.getOnCancelCallbacks()); + newOnCancelCallbacks.add(toAddCancellationCallbackForSBPState); + return new TaskCancellation(task, taskCancellation.getReasons(), newOnCancelCallbacks); + } + + private boolean shouldApply(TaskResourceUsageTrackerType trackerType) { + return trackerApplyConditions.get(trackerType).apply(nodeDuressTrackers); + } + + private List addResourceTrackerBasedCancellations( + TaskResourceUsageTrackerType type, + List taskCancellations, + Map, List> cancellableTasks + ) { + for (Map.Entry, TaskResourceUsageTrackers> taskResourceUsageTrackers : taskTrackers + .entrySet()) { + final Optional taskResourceUsageTracker = taskResourceUsageTrackers.getValue().getTracker(type); + final Class taskType = taskResourceUsageTrackers.getKey(); + + taskResourceUsageTracker.ifPresent( + tracker -> taskCancellations.addAll(tracker.getTaskCancellations(cancellableTasks.get(taskType))) + ); + } + + return taskCancellations; + } + + /** + * Method to reduce the taskCancellations into unique bunch + * @param taskCancellations all task cancellations + * @return unique task cancellations + */ + private List mergeTaskCancellations(final List taskCancellations) { + final Map uniqueTaskCancellations = new HashMap<>(); + + for (TaskCancellation taskCancellation : taskCancellations) { + final long taskId = taskCancellation.getTask().getId(); + uniqueTaskCancellations.put(taskId, uniqueTaskCancellations.getOrDefault(taskId, taskCancellation).merge(taskCancellation)); + } + + return new ArrayList<>(uniqueTaskCancellations.values()); + } + /** * Given a task, returns the type of the task */ @@ -243,16 +330,7 @@ Class getTaskType(Task task) { * Returns true if the node is in duress consecutively for the past 'n' observations. */ boolean isNodeInDuress() { - boolean isNodeInDuress = false; - int numSuccessiveBreaches = getSettings().getNodeDuressSettings().getNumSuccessiveBreaches(); - - for (NodeDuressTracker tracker : nodeDuressTrackers) { - if (tracker.check() >= numSuccessiveBreaches) { - isNodeInDuress = true; // not breaking the loop so that each tracker's streak gets updated. - } - } - - return isNodeInDuress; + return nodeDuressTrackers.isNodeInDuress(); } /* @@ -271,39 +349,6 @@ List getTa .collect(Collectors.toUnmodifiableList()); } - /** - * Returns a TaskCancellation wrapper containing the list of reasons (possibly zero), along with an overall - * cancellation score for the given task. Cancelling a task with a higher score has better chance of recovering the - * node from duress. - */ - TaskCancellation getTaskCancellation(CancellableTask task) { - List reasons = new ArrayList<>(); - List callbacks = new ArrayList<>(); - Class taskType = getTaskType(task); - List trackers = taskTrackers.get(taskType); - for (TaskResourceUsageTracker tracker : trackers) { - Optional reason = tracker.checkAndMaybeGetCancellationReason(task); - if (reason.isPresent()) { - callbacks.add(tracker::incrementCancellations); - reasons.add(reason.get()); - } - } - callbacks.add(searchBackpressureStates.get(taskType)::incrementCancellationCount); - - return new TaskCancellation(task, reasons, callbacks); - } - - /** - * Returns a list of TaskCancellations sorted by descending order of their cancellation scores. - */ - List getTaskCancellations(List tasks) { - return tasks.stream() - .map(this::getTaskCancellation) - .filter(TaskCancellation::isEligibleForCancellation) - .sorted(Comparator.reverseOrder()) - .collect(Collectors.toUnmodifiableList()); - } - SearchBackpressureSettings getSettings() { return settings; } @@ -315,7 +360,7 @@ SearchBackpressureState getSearchBackpressureState(Class getTrackers( + public static TaskResourceUsageTrackers getTrackers( LongSupplier cpuThresholdSupplier, DoubleSupplier heapVarianceSupplier, DoubleSupplier heapPercentThresholdSupplier, @@ -324,23 +369,27 @@ public static List getTrackers( ClusterSettings clusterSettings, Setting windowSizeSetting ) { - List trackers = new ArrayList<>(); - trackers.add(new CpuUsageTracker(cpuThresholdSupplier)); + TaskResourceUsageTrackers trackers = new TaskResourceUsageTrackers(); + trackers.addTracker(new CpuUsageTracker(cpuThresholdSupplier), TaskResourceUsageTrackerType.CPU_USAGE_TRACKER); if (isHeapTrackingSupported()) { - trackers.add( + trackers.addTracker( new HeapUsageTracker( heapVarianceSupplier, heapPercentThresholdSupplier, heapMovingAverageWindowSize, clusterSettings, windowSizeSetting - ) + ), + TaskResourceUsageTrackerType.HEAP_USAGE_TRACKER ); } else { logger.warn("heap size couldn't be determined"); } - trackers.add(new ElapsedTimeTracker(ElapsedTimeNanosSupplier, System::nanoTime)); - return Collections.unmodifiableList(trackers); + trackers.addTracker( + new ElapsedTimeTracker(ElapsedTimeNanosSupplier, System::nanoTime), + TaskResourceUsageTrackerType.ELAPSED_TIME_TRACKER + ); + return trackers; } @Override @@ -360,8 +409,8 @@ public void onTaskCompleted(Task task) { } List exceptions = new ArrayList<>(); - List trackers = taskTrackers.get(taskType); - for (TaskResourceUsageTracker tracker : trackers) { + TaskResourceUsageTrackers trackers = taskTrackers.get(taskType); + for (TaskResourceUsageTracker tracker : trackers.all()) { try { tracker.update(task); } catch (Exception e) { @@ -401,6 +450,7 @@ public SearchBackpressureStats nodeStats() { searchBackpressureStates.get(SearchTask.class).getLimitReachedCount(), searchBackpressureStates.get(SearchTask.class).getCompletionCount(), taskTrackers.get(SearchTask.class) + .all() .stream() .collect(Collectors.toUnmodifiableMap(t -> TaskResourceUsageTrackerType.fromName(t.name()), t -> t.stats(searchTasks))) ); @@ -410,6 +460,7 @@ public SearchBackpressureStats nodeStats() { searchBackpressureStates.get(SearchShardTask.class).getLimitReachedCount(), searchBackpressureStates.get(SearchShardTask.class).getCompletionCount(), taskTrackers.get(SearchShardTask.class) + .all() .stream() .collect(Collectors.toUnmodifiableMap(t -> TaskResourceUsageTrackerType.fromName(t.name()), t -> t.stats(searchShardTasks))) ); diff --git a/server/src/main/java/org/opensearch/search/backpressure/stats/SearchShardTaskStats.java b/server/src/main/java/org/opensearch/search/backpressure/stats/SearchShardTaskStats.java index ffe97d125b27a..be714271c8919 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/stats/SearchShardTaskStats.java +++ b/server/src/main/java/org/opensearch/search/backpressure/stats/SearchShardTaskStats.java @@ -18,8 +18,8 @@ import org.opensearch.search.backpressure.trackers.CpuUsageTracker; import org.opensearch.search.backpressure.trackers.ElapsedTimeTracker; import org.opensearch.search.backpressure.trackers.HeapUsageTracker; -import org.opensearch.search.backpressure.trackers.TaskResourceUsageTracker; import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType; +import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers.TaskResourceUsageTracker; import java.io.IOException; import java.util.Map; diff --git a/server/src/main/java/org/opensearch/search/backpressure/stats/SearchTaskStats.java b/server/src/main/java/org/opensearch/search/backpressure/stats/SearchTaskStats.java index a7f9b4e3d004f..0f5f409b15def 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/stats/SearchTaskStats.java +++ b/server/src/main/java/org/opensearch/search/backpressure/stats/SearchTaskStats.java @@ -18,8 +18,8 @@ import org.opensearch.search.backpressure.trackers.CpuUsageTracker; import org.opensearch.search.backpressure.trackers.ElapsedTimeTracker; import org.opensearch.search.backpressure.trackers.HeapUsageTracker; -import org.opensearch.search.backpressure.trackers.TaskResourceUsageTracker; import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType; +import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers.TaskResourceUsageTracker; import java.io.IOException; import java.util.Map; diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/CpuUsageTracker.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/CpuUsageTracker.java index 507953cb4a20e..a303b625f4b59 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/trackers/CpuUsageTracker.java +++ b/server/src/main/java/org/opensearch/search/backpressure/trackers/CpuUsageTracker.java @@ -12,6 +12,7 @@ import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers.TaskResourceUsageTracker; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskCancellation; @@ -34,7 +35,30 @@ public class CpuUsageTracker extends TaskResourceUsageTracker { private final LongSupplier thresholdSupplier; public CpuUsageTracker(LongSupplier thresholdSupplier) { + this(thresholdSupplier, (task) -> { + long usage = task.getTotalResourceStats().getCpuTimeInNanos(); + long threshold = thresholdSupplier.getAsLong(); + + if (usage < threshold) { + return Optional.empty(); + } + + return Optional.of( + new TaskCancellation.Reason( + "cpu usage exceeded [" + + new TimeValue(usage, TimeUnit.NANOSECONDS) + + " >= " + + new TimeValue(threshold, TimeUnit.NANOSECONDS) + + "]", + 1 // TODO: fine-tune the cancellation score/weight + ) + ); + }); + } + + public CpuUsageTracker(LongSupplier thresholdSupplier, ResourceUsageBreachEvaluator resourceUsageBreachEvaluator) { this.thresholdSupplier = thresholdSupplier; + this.resourceUsageBreachEvaluator = resourceUsageBreachEvaluator; } @Override @@ -42,27 +66,6 @@ public String name() { return CPU_USAGE_TRACKER.getName(); } - @Override - public Optional checkAndMaybeGetCancellationReason(Task task) { - long usage = task.getTotalResourceStats().getCpuTimeInNanos(); - long threshold = thresholdSupplier.getAsLong(); - - if (usage < threshold) { - return Optional.empty(); - } - - return Optional.of( - new TaskCancellation.Reason( - "cpu usage exceeded [" - + new TimeValue(usage, TimeUnit.NANOSECONDS) - + " >= " - + new TimeValue(threshold, TimeUnit.NANOSECONDS) - + "]", - 1 // TODO: fine-tune the cancellation score/weight - ) - ); - } - @Override public TaskResourceUsageTracker.Stats stats(List activeTasks) { long currentMax = activeTasks.stream().mapToLong(t -> t.getTotalResourceStats().getCpuTimeInNanos()).max().orElse(0); diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTracker.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTracker.java index f1e8abe7e3230..216947315cd2d 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTracker.java +++ b/server/src/main/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTracker.java @@ -12,6 +12,7 @@ import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers.TaskResourceUsageTracker; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskCancellation; @@ -34,8 +35,35 @@ public class ElapsedTimeTracker extends TaskResourceUsageTracker { private final LongSupplier timeNanosSupplier; public ElapsedTimeTracker(LongSupplier thresholdSupplier, LongSupplier timeNanosSupplier) { + this(thresholdSupplier, timeNanosSupplier, (Task task) -> { + long usage = timeNanosSupplier.getAsLong() - task.getStartTimeNanos(); + long threshold = thresholdSupplier.getAsLong(); + + if (usage < threshold) { + return Optional.empty(); + } + + return Optional.of( + new TaskCancellation.Reason( + "elapsed time exceeded [" + + new TimeValue(usage, TimeUnit.NANOSECONDS) + + " >= " + + new TimeValue(threshold, TimeUnit.NANOSECONDS) + + "]", + 1 // TODO: fine-tune the cancellation score/weight + ) + ); + }); + } + + public ElapsedTimeTracker( + LongSupplier thresholdSupplier, + LongSupplier timeNanosSupplier, + ResourceUsageBreachEvaluator resourceUsageBreachEvaluator + ) { this.thresholdSupplier = thresholdSupplier; this.timeNanosSupplier = timeNanosSupplier; + this.resourceUsageBreachEvaluator = resourceUsageBreachEvaluator; } @Override @@ -43,27 +71,6 @@ public String name() { return ELAPSED_TIME_TRACKER.getName(); } - @Override - public Optional checkAndMaybeGetCancellationReason(Task task) { - long usage = timeNanosSupplier.getAsLong() - task.getStartTimeNanos(); - long threshold = thresholdSupplier.getAsLong(); - - if (usage < threshold) { - return Optional.empty(); - } - - return Optional.of( - new TaskCancellation.Reason( - "elapsed time exceeded [" - + new TimeValue(usage, TimeUnit.NANOSECONDS) - + " >= " - + new TimeValue(threshold, TimeUnit.NANOSECONDS) - + "]", - 1 // TODO: fine-tune the cancellation score/weight - ) - ); - } - @Override public TaskResourceUsageTracker.Stats stats(List activeTasks) { long now = timeNanosSupplier.getAsLong(); diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/HeapUsageTracker.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/HeapUsageTracker.java index 56b9f947f6e37..c69de8ce21f89 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/trackers/HeapUsageTracker.java +++ b/server/src/main/java/org/opensearch/search/backpressure/trackers/HeapUsageTracker.java @@ -18,6 +18,7 @@ import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.monitor.jvm.JvmStats; +import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers.TaskResourceUsageTracker; import org.opensearch.tasks.CancellableTask; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskCancellation; @@ -55,6 +56,43 @@ public HeapUsageTracker( this.heapPercentThresholdSupplier = heapPercentThresholdSupplier; this.movingAverageReference = new AtomicReference<>(new MovingAverage(heapMovingAverageWindowSize)); clusterSettings.addSettingsUpdateConsumer(windowSizeSetting, this::updateWindowSize); + setDefaultResourceUsageBreachEvaluator(); + } + + /** + * Had to refactor this method out of the constructor as we can't pass a lambda which references a member variable in constructor + * error: cannot reference movingAverageReference before supertype constructor has been called + */ + private void setDefaultResourceUsageBreachEvaluator() { + this.resourceUsageBreachEvaluator = (task) -> { + MovingAverage movingAverage = movingAverageReference.get(); + + // There haven't been enough measurements. + if (movingAverage.isReady() == false) { + return Optional.empty(); + } + + double currentUsage = task.getTotalResourceStats().getMemoryInBytes(); + double averageUsage = movingAverage.getAverage(); + double variance = heapVarianceSupplier.getAsDouble(); + double allowedUsage = averageUsage * variance; + double threshold = heapPercentThresholdSupplier.getAsDouble() * HEAP_SIZE_BYTES; + + if (isHeapTrackingSupported() == false || currentUsage < threshold || currentUsage < allowedUsage) { + return Optional.empty(); + } + + return Optional.of( + new TaskCancellation.Reason( + "heap usage exceeded [" + + new ByteSizeValue((long) currentUsage) + + " >= " + + new ByteSizeValue((long) allowedUsage) + + "]", + (int) (currentUsage / averageUsage) // TODO: fine-tune the cancellation score/weight + ) + ); + }; } @Override @@ -67,33 +105,6 @@ public void update(Task task) { movingAverageReference.get().record(task.getTotalResourceStats().getMemoryInBytes()); } - @Override - public Optional checkAndMaybeGetCancellationReason(Task task) { - MovingAverage movingAverage = movingAverageReference.get(); - - // There haven't been enough measurements. - if (movingAverage.isReady() == false) { - return Optional.empty(); - } - - double currentUsage = task.getTotalResourceStats().getMemoryInBytes(); - double averageUsage = movingAverage.getAverage(); - double variance = heapVarianceSupplier.getAsDouble(); - double allowedUsage = averageUsage * variance; - double threshold = heapPercentThresholdSupplier.getAsDouble() * HEAP_SIZE_BYTES; - - if (isHeapTrackingSupported() == false || currentUsage < threshold || currentUsage < allowedUsage) { - return Optional.empty(); - } - - return Optional.of( - new TaskCancellation.Reason( - "heap usage exceeded [" + new ByteSizeValue((long) currentUsage) + " >= " + new ByteSizeValue((long) allowedUsage) + "]", - (int) (currentUsage / averageUsage) // TODO: fine-tune the cancellation score/weight - ) - ); - } - private void updateWindowSize(int heapMovingAverageWindowSize) { this.movingAverageReference.set(new MovingAverage(heapMovingAverageWindowSize)); } diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/NodeDuressTracker.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/NodeDuressTracker.java deleted file mode 100644 index 8e35c724a8fef..0000000000000 --- a/server/src/main/java/org/opensearch/search/backpressure/trackers/NodeDuressTracker.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.search.backpressure.trackers; - -import org.opensearch.common.util.Streak; - -import java.util.function.BooleanSupplier; - -/** - * NodeDuressTracker is used to check if the node is in duress. - * - * @opensearch.internal - */ -public class NodeDuressTracker { - /** - * Tracks the number of consecutive breaches. - */ - private final Streak breaches = new Streak(); - - /** - * Predicate that returns true when the node is in duress. - */ - private final BooleanSupplier isNodeInDuress; - - public NodeDuressTracker(BooleanSupplier isNodeInDuress) { - this.isNodeInDuress = isNodeInDuress; - } - - /** - * Evaluates the predicate and returns the number of consecutive breaches. - */ - public int check() { - return breaches.record(isNodeInDuress.getAsBoolean()); - } -} diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/NodeDuressTrackers.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/NodeDuressTrackers.java new file mode 100644 index 0000000000000..ae60a82fc2816 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/backpressure/trackers/NodeDuressTrackers.java @@ -0,0 +1,83 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.backpressure.trackers; + +import org.opensearch.common.util.Streak; +import org.opensearch.search.ResourceType; + +import java.util.Map; +import java.util.function.BooleanSupplier; +import java.util.function.IntSupplier; + +/** + * NodeDuressTrackers is used to check if the node is in duress based on various resources. + * + * @opensearch.internal + */ +public class NodeDuressTrackers { + private final Map duressTrackers; + + public NodeDuressTrackers(Map duressTrackers) { + this.duressTrackers = duressTrackers; + } + + /** + * Method to check the {@link ResourceType} in duress + * @return Boolean + */ + public boolean isResourceInDuress(ResourceType resourceType) { + return duressTrackers.get(resourceType).test(); + } + + /** + * Method to evaluate whether the node is in duress or not + * @return true if node is in duress because of either system resource + */ + public boolean isNodeInDuress() { + for (ResourceType resourceType : ResourceType.values()) { + if (isResourceInDuress(resourceType)) { + return true; + } + } + return false; + } + + /** + * NodeDuressTracker is used to check if the node is in duress + * @opensearch.internal + */ + public static class NodeDuressTracker { + /** + * Tracks the number of consecutive breaches. + */ + private final Streak breaches = new Streak(); + + /** + * Predicate that returns true when the node is in duress. + */ + private final BooleanSupplier isNodeInDuress; + + /** + * Predicate that returns the max number of breaches allowed for this resource before we mark it as in duress + */ + private final IntSupplier maxBreachAllowedSupplier; + + public NodeDuressTracker(BooleanSupplier isNodeInDuress, IntSupplier maxBreachAllowedSupplier) { + this.isNodeInDuress = isNodeInDuress; + this.maxBreachAllowedSupplier = maxBreachAllowedSupplier; + } + + /** + * Returns true if the node is in duress consecutively for the past 'n' observations. + */ + public boolean test() { + return breaches.record(isNodeInDuress.getAsBoolean()) >= maxBreachAllowedSupplier.getAsInt(); + } + } +} diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTracker.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTracker.java deleted file mode 100644 index ce15e9e9b6622..0000000000000 --- a/server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTracker.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.search.backpressure.trackers; - -import org.opensearch.core.common.io.stream.Writeable; -import org.opensearch.core.xcontent.ToXContentObject; -import org.opensearch.tasks.Task; -import org.opensearch.tasks.TaskCancellation; - -import java.util.List; -import java.util.Optional; -import java.util.concurrent.atomic.AtomicLong; - -/** - * TaskResourceUsageTracker is used to track completions and cancellations of search related tasks. - * - * @opensearch.internal - */ -public abstract class TaskResourceUsageTracker { - /** - * Counts the number of cancellations made due to this tracker. - */ - private final AtomicLong cancellations = new AtomicLong(); - - public long incrementCancellations() { - return cancellations.incrementAndGet(); - } - - public long getCancellations() { - return cancellations.get(); - } - - /** - * Returns a unique name for this tracker. - */ - public abstract String name(); - - /** - * Notifies the tracker to update its state when a task execution completes. - */ - public void update(Task task) {} - - /** - * Returns the cancellation reason for the given task, if it's eligible for cancellation. - */ - public abstract Optional checkAndMaybeGetCancellationReason(Task task); - - /** - * Returns the tracker's state for tasks as seen in the stats API. - */ - public abstract Stats stats(List activeTasks); - - /** - * Represents the tracker's state as seen in the stats API. - */ - public interface Stats extends ToXContentObject, Writeable {} -} diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTrackers.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTrackers.java new file mode 100644 index 0000000000000..3b0072288681c --- /dev/null +++ b/server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTrackers.java @@ -0,0 +1,148 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.backpressure.trackers; + +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.tasks.CancellableTask; +import org.opensearch.tasks.Task; +import org.opensearch.tasks.TaskCancellation; + +import java.util.ArrayList; +import java.util.EnumMap; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; + +/** + * TaskResourceUsageTrackers is used to hold all the {@link TaskResourceUsageTracker} objects. + * + * @opensearch.internal + */ +public class TaskResourceUsageTrackers { + private final EnumMap all; + + public TaskResourceUsageTrackers() { + all = new EnumMap<>(TaskResourceUsageTrackerType.class); + } + + /** + * adds the tracker for the TrackerType + * @param tracker is {@link TaskResourceUsageTracker} implementation which will be added + * @param trackerType is {@link TaskResourceUsageTrackerType} which depicts the implementation type + */ + public void addTracker(final TaskResourceUsageTracker tracker, final TaskResourceUsageTrackerType trackerType) { + all.put(trackerType, tracker); + } + + /** + * getter for tracker for a {@link TaskResourceUsageTrackerType} + * @param type for which the implementation is returned + * @return the {@link TaskResourceUsageTrackerType} + */ + public Optional getTracker(TaskResourceUsageTrackerType type) { + return Optional.ofNullable(all.get(type)); + } + + /** + * Method to access all available {@link TaskResourceUsageTracker} + * @return all enabled and available {@link TaskResourceUsageTracker}s + */ + public List all() { + return new ArrayList<>(all.values()); + } + + /** + * TaskResourceUsageTracker is used to track completions and cancellations of search related tasks. + * @opensearch.internal + */ + public static abstract class TaskResourceUsageTracker { + /** + * Counts the number of cancellations made due to this tracker. + */ + private final AtomicLong cancellations = new AtomicLong(); + protected ResourceUsageBreachEvaluator resourceUsageBreachEvaluator; + + /** + * for test purposes only + * @param resourceUsageBreachEvaluator which suggests whether a task should be cancelled or not + */ + public void setResourceUsageBreachEvaluator(final ResourceUsageBreachEvaluator resourceUsageBreachEvaluator) { + this.resourceUsageBreachEvaluator = resourceUsageBreachEvaluator; + } + + public long incrementCancellations() { + return cancellations.incrementAndGet(); + } + + public long getCancellations() { + return cancellations.get(); + } + + /** + * Returns a unique name for this tracker. + */ + public abstract String name(); + + /** + * Notifies the tracker to update its state when a task execution completes. + */ + public void update(Task task) {} + + /** + * Returns the cancellation reason for the given task, if it's eligible for cancellation. + */ + public Optional checkAndMaybeGetCancellationReason(Task task) { + return resourceUsageBreachEvaluator.evaluate(task); + } + + /** + * Returns the tracker's state for tasks as seen in the stats API. + */ + public abstract Stats stats(List activeTasks); + + /** + * Method to get taskCancellations due to this tracker for the given {@link CancellableTask} tasks + * @param tasks cancellation eligible tasks due to node duress and search traffic threshold breach + * @return the list of tasks which are breaching task level thresholds for this {@link TaskResourceUsageTracker} + */ + public List getTaskCancellations(List tasks) { + return tasks.stream() + .map(task -> this.getTaskCancellation(task, List.of(this::incrementCancellations))) + .filter(TaskCancellation::isEligibleForCancellation) + .collect(Collectors.toList()); + } + + private TaskCancellation getTaskCancellation(final CancellableTask task, final List cancellationCallback) { + Optional reason = checkAndMaybeGetCancellationReason(task); + List reasons = new ArrayList<>(); + reason.ifPresent(reasons::add); + + return new TaskCancellation(task, reasons, cancellationCallback); + } + + /** + * Represents the tracker's state as seen in the stats API. + */ + public interface Stats extends ToXContentObject, Writeable {} + + /** + * This interface carries the logic to decide whether a task should be cancelled or not + */ + public interface ResourceUsageBreachEvaluator { + /** + * evaluates whether the task is eligible for cancellation based on {@link TaskResourceUsageTracker} implementation + * @param task is input to this method on which the cancellation evaluation is performed + * @return a {@link TaskCancellation.Reason} why this task should be cancelled otherwise empty + */ + public Optional evaluate(final Task task); + } + } +} diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java index 89f1ea142336e..8da36bbb8d4bd 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java @@ -276,53 +276,47 @@ private void startNewShards(SnapshotsInProgress.Entry entry, Map() { - @Override - public void onResponse(String newGeneration) { - assert newGeneration != null; - assert newGeneration.equals(snapshotStatus.generation()); - if (logger.isDebugEnabled()) { - final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy(); - logger.debug( - "snapshot [{}] completed to [{}] with [{}] at generation [{}]", - snapshot, - snapshot.getRepository(), - lastSnapshotStatus, - snapshotStatus.generation() - ); - } - notifySuccessfulSnapshotShard(snapshot, shardId, newGeneration); + snapshot( + shardId, + snapshot, + indexId, + entry.userMetadata(), + snapshotStatus, + entry.version(), + entry.remoteStoreIndexShallowCopy(), + new ActionListener<>() { + @Override + public void onResponse(String newGeneration) { + assert newGeneration != null; + assert newGeneration.equals(snapshotStatus.generation()); + if (logger.isDebugEnabled()) { + final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy(); + logger.debug( + "snapshot [{}] completed to [{}] with [{}] at generation [{}]", + snapshot, + snapshot.getRepository(), + lastSnapshotStatus, + snapshotStatus.generation() + ); } + notifySuccessfulSnapshotShard(snapshot, shardId, newGeneration); + } - @Override - public void onFailure(Exception e) { - final String failure; - if (e instanceof AbortedSnapshotException) { - failure = "aborted"; - logger.debug(() -> new ParameterizedMessage("[{}][{}] aborted shard snapshot", shardId, snapshot), e); - } else { - failure = summarizeFailure(e); - logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e); - } - snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), failure); - notifyFailedSnapshotShard(snapshot, shardId, failure); + @Override + public void onFailure(Exception e) { + final String failure; + if (e instanceof AbortedSnapshotException) { + failure = "aborted"; + logger.debug(() -> new ParameterizedMessage("[{}][{}] aborted shard snapshot", shardId, snapshot), e); + } else { + failure = summarizeFailure(e); + logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e); } + snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), failure); + notifyFailedSnapshotShard(snapshot, shardId, failure); } - ); - } + } + ); } }); } diff --git a/server/src/main/java/org/opensearch/tasks/TaskCancellation.java b/server/src/main/java/org/opensearch/tasks/TaskCancellation.java index 2d152e513f197..872f5b79bb205 100644 --- a/server/src/main/java/org/opensearch/tasks/TaskCancellation.java +++ b/server/src/main/java/org/opensearch/tasks/TaskCancellation.java @@ -42,10 +42,25 @@ public List getReasons() { return reasons; } + public List getOnCancelCallbacks() { + return onCancelCallbacks; + } + public String getReasonString() { return reasons.stream().map(Reason::getMessage).collect(Collectors.joining(", ")); } + public TaskCancellation merge(final TaskCancellation other) { + if (other == this) { + return this; + } + final List newReasons = new ArrayList<>(reasons); + newReasons.addAll(other.getReasons()); + final List newOnCancelCallbacks = new ArrayList<>(onCancelCallbacks); + newOnCancelCallbacks.addAll(other.onCancelCallbacks); + return new TaskCancellation(task, newReasons, newOnCancelCallbacks); + } + /** * Cancels the task and invokes all onCancelCallbacks. */ diff --git a/server/src/test/java/org/opensearch/action/search/PitTestsUtil.java b/server/src/test/java/org/opensearch/action/search/PitTestsUtil.java index d80e011c04332..6571bdd8a3431 100644 --- a/server/src/test/java/org/opensearch/action/search/PitTestsUtil.java +++ b/server/src/test/java/org/opensearch/action/search/PitTestsUtil.java @@ -17,6 +17,7 @@ import org.opensearch.client.Client; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.action.ActionFuture; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.AtomicArray; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.query.IdsQueryBuilder; @@ -97,7 +98,8 @@ public static String getPitId() { return SearchContextId.encode(array.asList(), aliasFilters, version); } - public static void assertUsingGetAllPits(Client client, String id, long creationTime) throws ExecutionException, InterruptedException { + public static void assertUsingGetAllPits(Client client, String id, long creationTime, TimeValue keepAlive) throws ExecutionException, + InterruptedException { final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); clusterStateRequest.local(false); clusterStateRequest.clear().nodes(true).routingTable(true).indices("*"); @@ -113,6 +115,7 @@ public static void assertUsingGetAllPits(Client client, String id, long creation GetAllPitNodesResponse getPitResponse = execute1.get(); assertTrue(getPitResponse.getPitInfos().get(0).getPitId().contains(id)); Assert.assertEquals(getPitResponse.getPitInfos().get(0).getCreationTime(), creationTime); + Assert.assertEquals(getPitResponse.getPitInfos().get(0).getKeepAlive(), keepAlive.getMillis()); } public static void assertGetAllPitsEmpty(Client client) throws ExecutionException, InterruptedException { diff --git a/server/src/test/java/org/opensearch/search/CreatePitSingleNodeTests.java b/server/src/test/java/org/opensearch/search/CreatePitSingleNodeTests.java index 2661873d9498f..f54c5cfbacd46 100644 --- a/server/src/test/java/org/opensearch/search/CreatePitSingleNodeTests.java +++ b/server/src/test/java/org/opensearch/search/CreatePitSingleNodeTests.java @@ -76,7 +76,7 @@ public void testCreatePITSuccess() throws ExecutionException, InterruptedExcepti request.setIndices(new String[] { "index" }); ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); CreatePitResponse pitResponse = execute.get(); - PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); + PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime(), TimeValue.timeValueDays(1)); assertSegments(false, client(), pitResponse.getId()); client().prepareIndex("index").setId("2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); SearchResponse searchResponse = client().prepareSearch("index") @@ -106,7 +106,7 @@ public void testCreatePITWithMultipleIndicesSuccess() throws ExecutionException, ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); CreatePitResponse response = execute.get(); - PitTestsUtil.assertUsingGetAllPits(client(), response.getId(), response.getCreationTime()); + PitTestsUtil.assertUsingGetAllPits(client(), response.getId(), response.getCreationTime(), TimeValue.timeValueDays(1)); assertSegments(false, client(), response.getId()); assertEquals(4, response.getSuccessfulShards()); assertEquals(4, service.getActiveContexts()); @@ -127,7 +127,7 @@ public void testCreatePITWithShardReplicasSuccess() throws ExecutionException, I request.setIndices(new String[] { "index" }); ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); CreatePitResponse pitResponse = execute.get(); - PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); + PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime(), TimeValue.timeValueDays(1)); assertSegments(false, client(), pitResponse.getId()); client().prepareIndex("index").setId("2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); SearchResponse searchResponse = client().prepareSearch("index") @@ -229,7 +229,7 @@ public void testPitSearchOnCloseIndex() throws ExecutionException, InterruptedEx request.setIndices(new String[] { "index" }); ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); CreatePitResponse pitResponse = execute.get(); - PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); + PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime(), TimeValue.timeValueDays(1)); assertSegments(false, client(), pitResponse.getId()); SearchService service = getInstanceFromNode(SearchService.class); assertEquals(2, service.getActiveContexts()); @@ -412,7 +412,7 @@ public void testPitAfterUpdateIndex() throws Exception { request.setIndices(new String[] { "test" }); ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); CreatePitResponse pitResponse = execute.get(); - PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); + PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime(), TimeValue.timeValueMinutes(2)); SearchService service = getInstanceFromNode(SearchService.class); assertThat( @@ -570,7 +570,7 @@ public void testConcurrentSearches() throws Exception { request.setIndices(new String[] { "index" }); ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); CreatePitResponse pitResponse = execute.get(); - PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); + PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime(), TimeValue.timeValueDays(1)); assertSegments(false, client(), pitResponse.getId()); Thread[] threads = new Thread[5]; CountDownLatch latch = new CountDownLatch(threads.length); diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/nested/NestedAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/nested/NestedAggregatorTests.java index 406c411494d60..c7fbca538c6ee 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/nested/NestedAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/nested/NestedAggregatorTests.java @@ -34,6 +34,7 @@ import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; +import org.apache.lucene.document.LongPoint; import org.apache.lucene.document.SortedDocValuesField; import org.apache.lucene.document.SortedNumericDocValuesField; import org.apache.lucene.document.SortedSetDocValuesField; @@ -45,23 +46,36 @@ import org.apache.lucene.search.BooleanClause; import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.ConstantScoreQuery; +import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.TermQuery; +import org.apache.lucene.search.join.ScoreMode; import org.apache.lucene.store.Directory; import org.apache.lucene.tests.index.RandomIndexWriter; +import org.apache.lucene.util.BitSet; import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.FixedBitSet; import org.opensearch.common.CheckedConsumer; import org.opensearch.common.collect.Tuple; import org.opensearch.common.lucene.search.Queries; import org.opensearch.common.settings.Settings; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.cache.bitset.BitsetFilterCache; +import org.opensearch.index.mapper.ContentPath; import org.opensearch.index.mapper.IdFieldMapper; import org.opensearch.index.mapper.KeywordFieldMapper; import org.opensearch.index.mapper.MappedFieldType; +import org.opensearch.index.mapper.Mapper; import org.opensearch.index.mapper.NestedPathFieldMapper; import org.opensearch.index.mapper.NumberFieldMapper; +import org.opensearch.index.mapper.ObjectMapper; import org.opensearch.index.mapper.SeqNoFieldMapper; import org.opensearch.index.mapper.Uid; import org.opensearch.index.query.MatchAllQueryBuilder; +import org.opensearch.index.query.NestedQueryBuilder; +import org.opensearch.index.query.QueryShardContext; +import org.opensearch.index.query.TermsQueryBuilder; +import org.opensearch.index.query.support.NestedScope; import org.opensearch.script.MockScriptEngine; import org.opensearch.script.Script; import org.opensearch.script.ScriptEngine; @@ -104,20 +118,34 @@ import java.util.stream.DoubleStream; import java.util.stream.LongStream; +import org.mockito.Mockito; + import static java.util.stream.Collectors.toList; import static org.opensearch.search.aggregations.AggregationBuilders.max; import static org.opensearch.search.aggregations.AggregationBuilders.nested; +import static org.opensearch.search.aggregations.bucket.nested.NestedAggregator.getParentAndChildId; +import static org.opensearch.test.InternalAggregationTestCase.DEFAULT_MAX_BUCKETS; import static org.hamcrest.Matchers.equalTo; +import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class NestedAggregatorTests extends AggregatorTestCase { private static final String VALUE_FIELD_NAME = "number"; + private static final String VALUE_FIELD_NAME2 = "number2"; private static final String NESTED_OBJECT = "nested_object"; private static final String NESTED_OBJECT2 = "nested_object2"; private static final String NESTED_AGG = "nestedAgg"; private static final String MAX_AGG_NAME = "maxAgg"; private static final String SUM_AGG_NAME = "sumAgg"; private static final String INVERSE_SCRIPT = "inverse"; + private static final String OUT_NESTED = "outNested"; + private static final String OUT_TERMS = "outTerms"; + private static final String INNER_NESTED = "innerNested"; + private static final String INNER_TERMS = "innerTerms"; private static final SeqNoFieldMapper.SequenceIDFields sequenceIDFields = SeqNoFieldMapper.SequenceIDFields.emptySeqID(); @@ -201,17 +229,22 @@ public void testSingleNestingMax() throws IOException { } try (IndexReader indexReader = wrapInMockESDirectoryReader(DirectoryReader.open(directory))) { NestedAggregationBuilder nestedBuilder = new NestedAggregationBuilder(NESTED_AGG, NESTED_OBJECT); - MaxAggregationBuilder maxAgg = new MaxAggregationBuilder(MAX_AGG_NAME).field(VALUE_FIELD_NAME); + MaxAggregationBuilder maxAgg = new MaxAggregationBuilder(MAX_AGG_NAME).field(NESTED_OBJECT + "." + VALUE_FIELD_NAME); nestedBuilder.subAggregation(maxAgg); - MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType(VALUE_FIELD_NAME, NumberFieldMapper.NumberType.LONG); + MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType( + NESTED_OBJECT + "." + VALUE_FIELD_NAME, + NumberFieldMapper.NumberType.LONG + ); InternalNested nested = searchAndReduce( + createIndexSettings(), newSearcher(indexReader, false, true), new MatchAllDocsQuery(), nestedBuilder, + DEFAULT_MAX_BUCKETS, + true, fieldType ); - assertEquals(expectedNestedDocs, nested.getDocCount()); assertEquals(NESTED_AGG, nested.getName()); assertEquals(expectedNestedDocs, nested.getDocCount()); @@ -240,7 +273,7 @@ public void testDoubleNestingMax() throws IOException { int numNestedDocs = randomIntBetween(0, 20); expectedMaxValue = Math.max( expectedMaxValue, - generateMaxDocs(documents, numNestedDocs, i, NESTED_OBJECT + "." + NESTED_OBJECT2, VALUE_FIELD_NAME) + generateMaxDocs(documents, numNestedDocs, i, NESTED_OBJECT, VALUE_FIELD_NAME) ); expectedNestedDocs += numNestedDocs; @@ -253,19 +286,24 @@ public void testDoubleNestingMax() throws IOException { iw.commit(); } try (IndexReader indexReader = wrapInMockESDirectoryReader(DirectoryReader.open(directory))) { - NestedAggregationBuilder nestedBuilder = new NestedAggregationBuilder(NESTED_AGG, NESTED_OBJECT + "." + NESTED_OBJECT2); - MaxAggregationBuilder maxAgg = new MaxAggregationBuilder(MAX_AGG_NAME).field(VALUE_FIELD_NAME); + NestedAggregationBuilder nestedBuilder = new NestedAggregationBuilder(NESTED_AGG, NESTED_OBJECT); + MaxAggregationBuilder maxAgg = new MaxAggregationBuilder(MAX_AGG_NAME).field(NESTED_OBJECT + "." + VALUE_FIELD_NAME); nestedBuilder.subAggregation(maxAgg); - MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType(VALUE_FIELD_NAME, NumberFieldMapper.NumberType.LONG); + MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType( + NESTED_OBJECT + "." + VALUE_FIELD_NAME, + NumberFieldMapper.NumberType.LONG + ); InternalNested nested = searchAndReduce( + createIndexSettings(), newSearcher(indexReader, false, true), new MatchAllDocsQuery(), nestedBuilder, + DEFAULT_MAX_BUCKETS, + true, fieldType ); - assertEquals(expectedNestedDocs, nested.getDocCount()); assertEquals(NESTED_AGG, nested.getName()); assertEquals(expectedNestedDocs, nested.getDocCount()); @@ -310,17 +348,22 @@ public void testOrphanedDocs() throws IOException { } try (IndexReader indexReader = wrapInMockESDirectoryReader(DirectoryReader.open(directory))) { NestedAggregationBuilder nestedBuilder = new NestedAggregationBuilder(NESTED_AGG, NESTED_OBJECT); - SumAggregationBuilder sumAgg = new SumAggregationBuilder(SUM_AGG_NAME).field(VALUE_FIELD_NAME); + SumAggregationBuilder sumAgg = new SumAggregationBuilder(SUM_AGG_NAME).field(NESTED_OBJECT + "." + VALUE_FIELD_NAME); nestedBuilder.subAggregation(sumAgg); - MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType(VALUE_FIELD_NAME, NumberFieldMapper.NumberType.LONG); + MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType( + NESTED_OBJECT + "." + VALUE_FIELD_NAME, + NumberFieldMapper.NumberType.LONG + ); InternalNested nested = searchAndReduce( + createIndexSettings(), newSearcher(indexReader, false, true), new MatchAllDocsQuery(), nestedBuilder, + DEFAULT_MAX_BUCKETS, + true, fieldType ); - assertEquals(expectedNestedDocs, nested.getDocCount()); assertEquals(NESTED_AGG, nested.getName()); assertEquals(expectedNestedDocs, nested.getDocCount()); @@ -747,8 +790,24 @@ public void testFieldAlias() throws IOException { max(MAX_AGG_NAME).field(VALUE_FIELD_NAME + "-alias") ); - InternalNested nested = searchAndReduce(newSearcher(indexReader, false, true), new MatchAllDocsQuery(), agg, fieldType); - Nested aliasNested = searchAndReduce(newSearcher(indexReader, false, true), new MatchAllDocsQuery(), aliasAgg, fieldType); + InternalNested nested = searchAndReduce( + createIndexSettings(), + newSearcher(indexReader, false, true), + new MatchAllDocsQuery(), + agg, + DEFAULT_MAX_BUCKETS, + true, + fieldType + ); + Nested aliasNested = searchAndReduce( + createIndexSettings(), + newSearcher(indexReader, false, true), + new MatchAllDocsQuery(), + aliasAgg, + DEFAULT_MAX_BUCKETS, + true, + fieldType + ); assertEquals(nested, aliasNested); assertEquals(expectedNestedDocs, nested.getDocCount()); @@ -796,13 +855,15 @@ public void testNestedWithPipeline() throws IOException { MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType(VALUE_FIELD_NAME, NumberFieldMapper.NumberType.LONG); InternalNested nested = searchAndReduce( + createIndexSettings(), newSearcher(indexReader, false, true), new MatchAllDocsQuery(), nestedBuilder, + DEFAULT_MAX_BUCKETS, + true, fieldType ); - assertEquals(expectedNestedDocs, nested.getDocCount()); assertEquals(NESTED_AGG, nested.getName()); assertEquals(expectedNestedDocs, nested.getDocCount()); @@ -853,6 +914,238 @@ public void testNestedUnderTerms() throws IOException { }, resellersMappedFields()); } + public void testBufferingNestedLeafBucketCollector() throws IOException { + int numRootDocs = scaledRandomIntBetween(2, 200); + int expectedNestedDocs; + String[] bucketKeys; + try (Directory directory = newDirectory()) { + try (RandomIndexWriter iw = new RandomIndexWriter(random(), directory)) { + for (int i = 0; i < numRootDocs; i++) { + + List documents = new ArrayList<>(); + if (randomBoolean()) { + generateDocument(documents, i, NESTED_OBJECT, VALUE_FIELD_NAME, 1); + generateDocument(documents, i, NESTED_OBJECT2, VALUE_FIELD_NAME2, i); + } else { + generateDocument(documents, i, NESTED_OBJECT2, VALUE_FIELD_NAME2, i); + generateDocument(documents, i, NESTED_OBJECT, VALUE_FIELD_NAME, 1); + } + Document document = new Document(); + document.add(new Field(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(i)), IdFieldMapper.Defaults.FIELD_TYPE)); + document.add(sequenceIDFields.primaryTerm); + documents.add(document); + iw.addDocuments(documents); + } + iw.commit(); + } + try (IndexReader indexReader = wrapInMockESDirectoryReader(DirectoryReader.open(directory))) { + IndexSettings indexSettings = createIndexSettings(); + MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType( + NESTED_OBJECT + "." + VALUE_FIELD_NAME, + NumberFieldMapper.NumberType.LONG + ); + MappedFieldType fieldType1 = new NumberFieldMapper.NumberFieldType( + NESTED_OBJECT2 + "." + VALUE_FIELD_NAME2, + NumberFieldMapper.NumberType.LONG + ); + QueryShardContext queryShardContext = createQueryShardContext(NESTED_OBJECT2, indexSettings, fieldType1); + // query + expectedNestedDocs = numRootDocs / 2; + bucketKeys = new String[expectedNestedDocs]; + BytesRef[] values = new BytesRef[numRootDocs / 2]; + for (int i = 0; i < numRootDocs / 2; i++) { + bucketKeys[i] = "" + (i * 2); + values[i] = new BytesRef(bucketKeys[i]); + } + TermsQueryBuilder termsQueryBuilder = new TermsQueryBuilder(NESTED_OBJECT2 + "." + VALUE_FIELD_NAME2, (Object[]) values); + NestedQueryBuilder nestedQueryBuilder = new NestedQueryBuilder(NESTED_OBJECT2, termsQueryBuilder, ScoreMode.None); + + // out nested aggs + NestedAggregationBuilder outNestedBuilder = new NestedAggregationBuilder(OUT_NESTED, NESTED_OBJECT); + TermsAggregationBuilder outTermsAggregator = new TermsAggregationBuilder(OUT_TERMS).field( + NESTED_OBJECT + "." + VALUE_FIELD_NAME + ).size(100); + outNestedBuilder.subAggregation(outTermsAggregator); + + // inner nested aggs + NestedAggregationBuilder innerNestedBuilder = new NestedAggregationBuilder(INNER_NESTED, NESTED_OBJECT2); + TermsAggregationBuilder innerTermsAggregator = new TermsAggregationBuilder(INNER_TERMS).field( + NESTED_OBJECT2 + "." + VALUE_FIELD_NAME2 + ).size(100); + innerNestedBuilder.subAggregation(innerTermsAggregator); + outTermsAggregator.subAggregation(innerNestedBuilder); + + InternalNested nested = searchAndReduce( + indexSettings, + newSearcher(indexReader, false, true), + nestedQueryBuilder.toQuery(queryShardContext), + outNestedBuilder, + DEFAULT_MAX_BUCKETS, + true, + fieldType, + fieldType1 + ); + + assertEquals(OUT_NESTED, nested.getName()); + assertEquals(expectedNestedDocs, nested.getDocCount()); + + LongTerms outTerms = (LongTerms) nested.getProperty(OUT_TERMS); + assertEquals(1, outTerms.getBuckets().size()); + + InternalNested internalNested = (InternalNested) (((Object[]) outTerms.getProperty(INNER_NESTED))[0]); + assertEquals(expectedNestedDocs, internalNested.getDocCount()); + + LongTerms innerTerms = (LongTerms) internalNested.getProperty(INNER_TERMS); + assertEquals(bucketKeys.length, innerTerms.getBuckets().size()); + for (int i = 0; i < expectedNestedDocs; i++) { + LongTerms.Bucket bucket = innerTerms.getBuckets().get(i); + assertEquals(bucketKeys[i], bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + } + } + } + } + + private DocIdSetIterator getDocIdSetIterator(int[] value) { + int[] bits = new int[value[value.length - 1] + 1]; + for (int i : value) { + bits[i] = 1; + } + return new DocIdSetIterator() { + int index = -1; + + @Override + public int docID() { + if (index == -1 || index > bits.length || bits[index] != 1) { + return -1; + } + return index; + } + + @Override + public int nextDoc() { + for (int i = index; i < bits.length; i++) { + if (bits[i] == 1) { + index = i; + return index; + } + } + index = bits.length; + return NO_MORE_DOCS; + } + + @Override + public int advance(int target) { + for (int i = target; i < bits.length; i++) { + if (bits[i] == 1) { + index = i; + return index; + } + } + index = bits.length; + return NO_MORE_DOCS; + } + + @Override + public long cost() { + return bits.length; + } + }; + } + + public void testGetParentAndChildId() throws IOException { + { + // p: parent c: child + // [p0], [p1], [c2,p3], [c4,x5,p6], [p7], [p8] + BitSet parentDocs = new FixedBitSet(20); + parentDocs.set(0); + parentDocs.set(1); + parentDocs.set(3); + parentDocs.set(6); + parentDocs.set(7); + parentDocs.set(8); + DocIdSetIterator childDocs = getDocIdSetIterator(new int[] { 2, 4 }); + + Tuple res = getParentAndChildId(parentDocs, childDocs, 0); + assertEquals(0, res.v1().intValue()); + assertEquals(2, res.v2().intValue()); + + res = getParentAndChildId(parentDocs, childDocs, 3); + assertEquals(3, res.v1().intValue()); + assertEquals(2, res.v2().intValue()); + + res = getParentAndChildId(parentDocs, childDocs, 4); + assertEquals(6, res.v1().intValue()); + assertEquals(4, res.v2().intValue()); + + res = getParentAndChildId(parentDocs, childDocs, 8); + assertEquals(8, res.v1().intValue()); + assertEquals(NO_MORE_DOCS, res.v2().intValue()); + } + + { + // p: parent c: child1 d: child2 + // [p0], [c1,d2,p3], [d4,c5,p6], [c7,d8,p9], [c10,p11] + BitSet parentDocs = new FixedBitSet(20); + parentDocs.set(0); + parentDocs.set(3); + parentDocs.set(6); + parentDocs.set(9); + parentDocs.set(11); + { + DocIdSetIterator childDocs = getDocIdSetIterator(new int[] { 1, 5, 7, 10 }); + Tuple res = getParentAndChildId(parentDocs, childDocs, 2); + assertEquals(3, res.v1().intValue()); + assertEquals(1, res.v2().intValue()); + + res = getParentAndChildId(parentDocs, childDocs, 4); + assertEquals(6, res.v1().intValue()); + assertEquals(5, res.v2().intValue()); + + res = getParentAndChildId(parentDocs, childDocs, 8); + assertEquals(9, res.v1().intValue()); + assertEquals(7, res.v2().intValue()); + } + + { + DocIdSetIterator childDocs = getDocIdSetIterator(new int[] { 2, 4, 8 }); + Tuple res = getParentAndChildId(parentDocs, childDocs, 1); + assertEquals(3, res.v1().intValue()); + assertEquals(2, res.v2().intValue()); + + res = getParentAndChildId(parentDocs, childDocs, 5); + assertEquals(6, res.v1().intValue()); + assertEquals(4, res.v2().intValue()); + + res = getParentAndChildId(parentDocs, childDocs, 7); + assertEquals(9, res.v1().intValue()); + assertEquals(8, res.v2().intValue()); + + res = getParentAndChildId(parentDocs, childDocs, 10); + assertEquals(11, res.v1().intValue()); + assertEquals(NO_MORE_DOCS, res.v2().intValue()); + } + } + } + + protected QueryShardContext createQueryShardContext(String fieldName, IndexSettings indexSettings, MappedFieldType fieldType) { + QueryShardContext queryShardContext = mock(QueryShardContext.class); + when(queryShardContext.nestedScope()).thenReturn(new NestedScope(indexSettings)); + + BitsetFilterCache bitsetFilterCache = new BitsetFilterCache(indexSettings, Mockito.mock(BitsetFilterCache.Listener.class)); + when(queryShardContext.bitsetFilter(any())).thenReturn(bitsetFilterCache.getBitSetProducer(Queries.newNonNestedFilter())); + when(queryShardContext.fieldMapper(anyString())).thenReturn(fieldType); + when(queryShardContext.getSearchQuoteAnalyzer(any())).thenCallRealMethod(); + when(queryShardContext.getSearchAnalyzer(any())).thenCallRealMethod(); + when(queryShardContext.getIndexSettings()).thenReturn(indexSettings); + when(queryShardContext.getObjectMapper(anyString())).thenAnswer(invocation -> { + Mapper.BuilderContext context = new Mapper.BuilderContext(indexSettings.getSettings(), new ContentPath()); + return new ObjectMapper.Builder<>(fieldName).nested(ObjectMapper.Nested.newNested()).build(context); + }); + when(queryShardContext.allowExpensiveQueries()).thenReturn(true); + return queryShardContext; + } + public static CheckedConsumer buildResellerData(int numProducts, int numResellers) { return iw -> { for (int p = 0; p < numProducts; p++) { @@ -893,13 +1186,22 @@ private static double[] generateDocuments(List documents, int numNeste document.add(new Field(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(id)), IdFieldMapper.Defaults.NESTED_FIELD_TYPE)); document.add(new Field(NestedPathFieldMapper.NAME, path, NestedPathFieldMapper.Defaults.FIELD_TYPE)); long value = randomNonNegativeLong() % 10000; - document.add(new SortedNumericDocValuesField(fieldName, value)); + document.add(new SortedNumericDocValuesField(path + "." + fieldName, value)); documents.add(document); values[nested] = value; } return values; } + private static void generateDocument(List documents, int id, String path, String fieldName, long vales) { + Document document = new Document(); + document.add(new Field(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(id)), IdFieldMapper.Defaults.NESTED_FIELD_TYPE)); + document.add(new Field(NestedPathFieldMapper.NAME, path, NestedPathFieldMapper.Defaults.FIELD_TYPE)); + document.add(new SortedNumericDocValuesField(path + "." + fieldName, vales)); + document.add(new LongPoint(path + "." + fieldName, vales)); + documents.add(document); + } + private List generateBook(String id, String[] authors, int[] numPages) { List documents = new ArrayList<>(); diff --git a/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java b/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java index 9778798b706f4..43df482fcc2ae 100644 --- a/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java +++ b/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java @@ -16,6 +16,7 @@ import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.search.ResourceType; import org.opensearch.search.backpressure.settings.SearchBackpressureMode; import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; import org.opensearch.search.backpressure.settings.SearchShardTaskSettings; @@ -23,9 +24,11 @@ import org.opensearch.search.backpressure.stats.SearchBackpressureStats; import org.opensearch.search.backpressure.stats.SearchShardTaskStats; import org.opensearch.search.backpressure.stats.SearchTaskStats; -import org.opensearch.search.backpressure.trackers.NodeDuressTracker; -import org.opensearch.search.backpressure.trackers.TaskResourceUsageTracker; +import org.opensearch.search.backpressure.trackers.NodeDuressTrackers; +import org.opensearch.search.backpressure.trackers.NodeDuressTrackers.NodeDuressTracker; import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType; +import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers; +import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers.TaskResourceUsageTracker; import org.opensearch.tasks.CancellableTask; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskCancellation; @@ -42,6 +45,7 @@ import java.io.IOException; import java.util.Collections; +import java.util.EnumMap; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -52,10 +56,14 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.LongSupplier; +import static org.opensearch.search.ResourceType.CPU; +import static org.opensearch.search.ResourceType.JVM; import static org.opensearch.search.backpressure.SearchBackpressureTestHelpers.createMockTaskWithResourceStats; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyDouble; +import static org.mockito.Mockito.anyList; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -89,8 +97,15 @@ public void testIsNodeInDuress() { AtomicReference cpuUsage = new AtomicReference<>(); AtomicReference heapUsage = new AtomicReference<>(); - NodeDuressTracker cpuUsageTracker = new NodeDuressTracker(() -> cpuUsage.get() >= 0.5); - NodeDuressTracker heapUsageTracker = new NodeDuressTracker(() -> heapUsage.get() >= 0.5); + NodeDuressTracker cpuUsageTracker = new NodeDuressTracker(() -> cpuUsage.get() >= 0.5, () -> 3); + NodeDuressTracker heapUsageTracker = new NodeDuressTracker(() -> heapUsage.get() >= 0.5, () -> 3); + + EnumMap duressTrackers = new EnumMap<>(ResourceType.class) { + { + put(ResourceType.JVM, heapUsageTracker); + put(ResourceType.CPU, cpuUsageTracker); + } + }; SearchBackpressureSettings settings = new SearchBackpressureSettings( Settings.EMPTY, @@ -102,9 +117,9 @@ public void testIsNodeInDuress() { mockTaskResourceTrackingService, threadPool, System::nanoTime, - List.of(cpuUsageTracker, heapUsageTracker), - Collections.emptyList(), - Collections.emptyList(), + new NodeDuressTrackers(duressTrackers), + new TaskResourceUsageTrackers(), + new TaskResourceUsageTrackers(), taskManager ); @@ -132,6 +147,8 @@ public void testTrackerStateUpdateOnSearchTaskCompletion() { TaskResourceTrackingService mockTaskResourceTrackingService = mock(TaskResourceTrackingService.class); LongSupplier mockTimeNanosSupplier = () -> TimeUnit.SECONDS.toNanos(1234); TaskResourceUsageTracker mockTaskResourceUsageTracker = mock(TaskResourceUsageTracker.class); + TaskResourceUsageTrackers taskResourceUsageTrackers = new TaskResourceUsageTrackers(); + taskResourceUsageTrackers.addTracker(mockTaskResourceUsageTracker, TaskResourceUsageTrackerType.CPU_USAGE_TRACKER); SearchBackpressureSettings settings = new SearchBackpressureSettings( Settings.EMPTY, @@ -143,15 +160,15 @@ public void testTrackerStateUpdateOnSearchTaskCompletion() { mockTaskResourceTrackingService, threadPool, mockTimeNanosSupplier, - Collections.emptyList(), - List.of(mockTaskResourceUsageTracker), - Collections.emptyList(), + new NodeDuressTrackers(new EnumMap<>(ResourceType.class)), + taskResourceUsageTrackers, + new TaskResourceUsageTrackers(), taskManager ); for (int i = 0; i < 100; i++) { // service.onTaskCompleted(new SearchTask(1, "test", "test", () -> "Test", TaskId.EMPTY_TASK_ID, new HashMap<>())); - service.onTaskCompleted(createMockTaskWithResourceStats(SearchTask.class, 100, 200)); + service.onTaskCompleted(createMockTaskWithResourceStats(SearchTask.class, 100, 200, i)); } assertEquals(100, service.getSearchBackpressureState(SearchTask.class).getCompletionCount()); verify(mockTaskResourceUsageTracker, times(100)).update(any()); @@ -161,6 +178,8 @@ public void testTrackerStateUpdateOnSearchShardTaskCompletion() { TaskResourceTrackingService mockTaskResourceTrackingService = mock(TaskResourceTrackingService.class); LongSupplier mockTimeNanosSupplier = () -> TimeUnit.SECONDS.toNanos(1234); TaskResourceUsageTracker mockTaskResourceUsageTracker = mock(TaskResourceUsageTracker.class); + TaskResourceUsageTrackers taskResourceUsageTrackers = new TaskResourceUsageTrackers(); + taskResourceUsageTrackers.addTracker(mockTaskResourceUsageTracker, TaskResourceUsageTrackerType.CPU_USAGE_TRACKER); SearchBackpressureSettings settings = new SearchBackpressureSettings( Settings.EMPTY, @@ -172,16 +191,16 @@ public void testTrackerStateUpdateOnSearchShardTaskCompletion() { mockTaskResourceTrackingService, threadPool, mockTimeNanosSupplier, - Collections.emptyList(), - Collections.emptyList(), - List.of(mockTaskResourceUsageTracker), + new NodeDuressTrackers(new EnumMap<>(ResourceType.class)), + new TaskResourceUsageTrackers(), + taskResourceUsageTrackers, taskManager ); // Record task completions to update the tracker state. Tasks other than SearchTask & SearchShardTask are ignored. - service.onTaskCompleted(createMockTaskWithResourceStats(CancellableTask.class, 100, 200)); + service.onTaskCompleted(createMockTaskWithResourceStats(CancellableTask.class, 100, 200, 101)); for (int i = 0; i < 100; i++) { - service.onTaskCompleted(createMockTaskWithResourceStats(SearchShardTask.class, 100, 200)); + service.onTaskCompleted(createMockTaskWithResourceStats(SearchShardTask.class, 100, 200, i)); } assertEquals(100, service.getSearchBackpressureState(SearchShardTask.class).getCompletionCount()); verify(mockTaskResourceUsageTracker, times(100)).update(any()); @@ -192,21 +211,41 @@ public void testSearchTaskInFlightCancellation() { TaskResourceTrackingService mockTaskResourceTrackingService = mock(TaskResourceTrackingService.class); AtomicLong mockTime = new AtomicLong(0); LongSupplier mockTimeNanosSupplier = mockTime::get; - NodeDuressTracker mockNodeDuressTracker = new NodeDuressTracker(() -> true); + NodeDuressTracker mockNodeDuressTracker = new NodeDuressTracker(() -> true, () -> 3); - TaskResourceUsageTracker mockTaskResourceUsageTracker = getMockedTaskResourceUsageTracker(); + TaskResourceUsageTracker mockTaskResourceUsageTracker = getMockedTaskResourceUsageTracker( + TaskResourceUsageTrackerType.CPU_USAGE_TRACKER, + (task) -> { + if (task.getTotalResourceStats().getCpuTimeInNanos() < 300) { + return Optional.empty(); + } + + return Optional.of(new TaskCancellation.Reason("limits exceeded", 5)); + } + ); + TaskResourceUsageTrackers taskResourceUsageTrackers = new TaskResourceUsageTrackers(); + taskResourceUsageTrackers.addTracker(mockTaskResourceUsageTracker, TaskResourceUsageTrackerType.CPU_USAGE_TRACKER); // Mocking 'settings' with predictable rate limiting thresholds. SearchBackpressureSettings settings = getBackpressureSettings("enforced", 0.1, 0.003, 5.0); + NodeDuressTracker heapUsageTracker = new NodeDuressTracker(() -> false, () -> 3); + + EnumMap duressTrackers = new EnumMap<>(ResourceType.class) { + { + put(JVM, heapUsageTracker); + put(CPU, mockNodeDuressTracker); + } + }; + SearchBackpressureService service = new SearchBackpressureService( settings, mockTaskResourceTrackingService, threadPool, mockTimeNanosSupplier, - List.of(mockNodeDuressTracker), - List.of(mockTaskResourceUsageTracker), - Collections.emptyList(), + new NodeDuressTrackers(duressTrackers), + taskResourceUsageTrackers, + new TaskResourceUsageTrackers(), mockTaskManager ); @@ -225,9 +264,9 @@ public void testSearchTaskInFlightCancellation() { Map activeSearchTasks = new HashMap<>(); for (long i = 0; i < 75; i++) { if (i % 3 == 0) { - activeSearchTasks.put(i, createMockTaskWithResourceStats(SearchTask.class, 500, taskHeapUsageBytes)); + activeSearchTasks.put(i, createMockTaskWithResourceStats(SearchTask.class, 500, taskHeapUsageBytes, i)); } else { - activeSearchTasks.put(i, createMockTaskWithResourceStats(SearchTask.class, 100, taskHeapUsageBytes)); + activeSearchTasks.put(i, createMockTaskWithResourceStats(SearchTask.class, 100, taskHeapUsageBytes, i)); } } doReturn(activeSearchTasks).when(mockTaskResourceTrackingService).getResourceAwareTasks(); @@ -265,9 +304,28 @@ public void testSearchShardTaskInFlightCancellation() { TaskResourceTrackingService mockTaskResourceTrackingService = mock(TaskResourceTrackingService.class); AtomicLong mockTime = new AtomicLong(0); LongSupplier mockTimeNanosSupplier = mockTime::get; - NodeDuressTracker mockNodeDuressTracker = new NodeDuressTracker(() -> true); + NodeDuressTracker mockNodeDuressTracker = new NodeDuressTracker(() -> true, () -> 3); - TaskResourceUsageTracker mockTaskResourceUsageTracker = getMockedTaskResourceUsageTracker(); + EnumMap duressTrackers = new EnumMap<>(ResourceType.class) { + { + put(JVM, new NodeDuressTracker(() -> false, () -> 3)); + put(CPU, mockNodeDuressTracker); + } + }; + NodeDuressTrackers nodeDuressTrackers = new NodeDuressTrackers(duressTrackers); + + TaskResourceUsageTracker mockTaskResourceUsageTracker = getMockedTaskResourceUsageTracker( + TaskResourceUsageTrackerType.CPU_USAGE_TRACKER, + (task) -> { + if (task.getTotalResourceStats().getCpuTimeInNanos() < 300) { + return Optional.empty(); + } + + return Optional.of(new TaskCancellation.Reason("limits exceeded", 5)); + } + ); + TaskResourceUsageTrackers taskResourceUsageTrackers = new TaskResourceUsageTrackers(); + taskResourceUsageTrackers.addTracker(mockTaskResourceUsageTracker, TaskResourceUsageTrackerType.CPU_USAGE_TRACKER); // Mocking 'settings' with predictable rate limiting thresholds. SearchBackpressureSettings settings = getBackpressureSettings("enforced", 0.1, 0.003, 10.0); @@ -277,9 +335,9 @@ public void testSearchShardTaskInFlightCancellation() { mockTaskResourceTrackingService, threadPool, mockTimeNanosSupplier, - List.of(mockNodeDuressTracker), - Collections.emptyList(), - List.of(mockTaskResourceUsageTracker), + nodeDuressTrackers, + new TaskResourceUsageTrackers(), + taskResourceUsageTrackers, mockTaskManager ); @@ -298,9 +356,9 @@ public void testSearchShardTaskInFlightCancellation() { Map activeSearchShardTasks = new HashMap<>(); for (long i = 0; i < 75; i++) { if (i % 5 == 0) { - activeSearchShardTasks.put(i, createMockTaskWithResourceStats(SearchShardTask.class, 500, taskHeapUsageBytes)); + activeSearchShardTasks.put(i, createMockTaskWithResourceStats(SearchShardTask.class, 500, taskHeapUsageBytes, i)); } else { - activeSearchShardTasks.put(i, createMockTaskWithResourceStats(SearchShardTask.class, 100, taskHeapUsageBytes)); + activeSearchShardTasks.put(i, createMockTaskWithResourceStats(SearchShardTask.class, 100, taskHeapUsageBytes, i)); } } doReturn(activeSearchShardTasks).when(mockTaskResourceTrackingService).getResourceAwareTasks(); @@ -318,7 +376,7 @@ public void testSearchShardTaskInFlightCancellation() { // Simulate task completion to replenish some tokens. // This will add 2 tokens (task count delta * cancellationRatio) to 'rateLimitPerTaskCompletion'. for (int i = 0; i < 20; i++) { - service.onTaskCompleted(createMockTaskWithResourceStats(SearchShardTask.class, 100, taskHeapUsageBytes)); + service.onTaskCompleted(createMockTaskWithResourceStats(SearchShardTask.class, 100, taskHeapUsageBytes, i)); } service.doRun(); verify(mockTaskManager, times(12)).cancelTaskAndDescendants(any(), anyString(), anyBoolean(), any()); @@ -335,6 +393,181 @@ public void testSearchShardTaskInFlightCancellation() { assertEquals(expectedStats, actualStats); } + public void testNonCancellationOfHeapBasedTasksWhenHeapNotInDuress() { + TaskManager mockTaskManager = spy(taskManager); + TaskResourceTrackingService mockTaskResourceTrackingService = mock(TaskResourceTrackingService.class); + AtomicLong mockTime = new AtomicLong(0); + LongSupplier mockTimeNanosSupplier = mockTime::get; + + EnumMap duressTrackers = new EnumMap<>(ResourceType.class) { + { + put(JVM, new NodeDuressTracker(() -> false, () -> 3)); + put(CPU, new NodeDuressTracker(() -> true, () -> 3)); + } + }; + + NodeDuressTrackers nodeDuressTrackers = new NodeDuressTrackers(duressTrackers); + + // Creating heap and cpu usage trackers where heap tracker will always evaluate with reasons to cancel the + // tasks but heap based cancellation should not happen because heap is not in duress + TaskResourceUsageTracker heapUsageTracker = getMockedTaskResourceUsageTracker( + TaskResourceUsageTrackerType.HEAP_USAGE_TRACKER, + (task) -> Optional.of(new TaskCancellation.Reason("mem exceeded", 10)) + ); + TaskResourceUsageTracker cpuUsageTracker = getMockedTaskResourceUsageTracker( + TaskResourceUsageTrackerType.CPU_USAGE_TRACKER, + (task) -> { + if (task.getTotalResourceStats().getCpuTimeInNanos() < 400) { + return Optional.empty(); + } + return Optional.of(new TaskCancellation.Reason("cpu time limit exceeded", 5)); + } + ); + + TaskResourceUsageTrackers taskResourceUsageTrackers = new TaskResourceUsageTrackers(); + taskResourceUsageTrackers.addTracker(cpuUsageTracker, TaskResourceUsageTrackerType.CPU_USAGE_TRACKER); + taskResourceUsageTrackers.addTracker(heapUsageTracker, TaskResourceUsageTrackerType.HEAP_USAGE_TRACKER); + + // Mocking 'settings' with predictable rate limiting thresholds. + SearchBackpressureSettings settings = getBackpressureSettings("enforced", 0.1, 0.003, 10.0); + + SearchBackpressureService service = new SearchBackpressureService( + settings, + mockTaskResourceTrackingService, + threadPool, + mockTimeNanosSupplier, + nodeDuressTrackers, + taskResourceUsageTrackers, + new TaskResourceUsageTrackers(), + mockTaskManager + ); + + service.doRun(); + service.doRun(); + + SearchTaskSettings searchTaskSettings = mock(SearchTaskSettings.class); + // setting the total heap percent threshold to minimum so that circuit does not break in SearchBackpressureService + when(searchTaskSettings.getTotalHeapPercentThreshold()).thenReturn(0.0); + when(settings.getSearchTaskSettings()).thenReturn(searchTaskSettings); + + // Create a mix of low and high resource usage tasks (60 low + 15 high resource usage tasks). + Map activeSearchTasks = new HashMap<>(); + for (long i = 0; i < 75; i++) { + if (i % 5 == 0) { + activeSearchTasks.put(i, createMockTaskWithResourceStats(SearchTask.class, 500, 800, i)); + } else { + activeSearchTasks.put(i, createMockTaskWithResourceStats(SearchTask.class, 100, 800, i)); + } + } + doReturn(activeSearchTasks).when(mockTaskResourceTrackingService).getResourceAwareTasks(); + + // this will trigger cancellation but these cancellation should only be cpu based + service.doRun(); + verify(mockTaskManager, times(5)).cancelTaskAndDescendants(any(), anyString(), anyBoolean(), any()); + assertEquals(5, service.getSearchBackpressureState(SearchTask.class).getCancellationCount()); + assertEquals(1, service.getSearchBackpressureState(SearchTask.class).getLimitReachedCount()); + + SearchBackpressureStats expectedStats = new SearchBackpressureStats( + new SearchTaskStats( + 5, + 1, + 0, + Map.of( + TaskResourceUsageTrackerType.CPU_USAGE_TRACKER, + new MockStats(5), + TaskResourceUsageTrackerType.HEAP_USAGE_TRACKER, + new MockStats(0) + ) + ), + new SearchShardTaskStats(0, 0, 0, Collections.emptyMap()), + SearchBackpressureMode.ENFORCED + ); + + SearchBackpressureStats actualStats = service.nodeStats(); + assertEquals(expectedStats, actualStats); + } + + public void testNonCancellationWhenSearchTrafficIsNotQualifyingForCancellation() { + TaskManager mockTaskManager = spy(taskManager); + TaskResourceTrackingService mockTaskResourceTrackingService = mock(TaskResourceTrackingService.class); + AtomicLong mockTime = new AtomicLong(0); + LongSupplier mockTimeNanosSupplier = mockTime::get; + + EnumMap duressTrackers = new EnumMap<>(ResourceType.class) { + { + put(JVM, new NodeDuressTracker(() -> false, () -> 3)); + put(CPU, new NodeDuressTracker(() -> true, () -> 3)); + } + }; + + NodeDuressTrackers nodeDuressTrackers = new NodeDuressTrackers(duressTrackers); + + // Creating heap and cpu usage trackers where heap tracker will always evaluate with reasons to cancel the + // tasks but heap based cancellation should not happen because heap is not in duress + TaskResourceUsageTracker heapUsageTracker = getMockedTaskResourceUsageTracker( + TaskResourceUsageTrackerType.HEAP_USAGE_TRACKER, + (task) -> Optional.of(new TaskCancellation.Reason("mem exceeded", 10)) + ); + TaskResourceUsageTracker cpuUsageTracker = getMockedTaskResourceUsageTracker( + TaskResourceUsageTrackerType.CPU_USAGE_TRACKER, + (task) -> { + if (task.getTotalResourceStats().getCpuTimeInNanos() < 400) { + return Optional.empty(); + } + return Optional.of(new TaskCancellation.Reason("cpu time limit exceeded", 5)); + } + ); + + TaskResourceUsageTrackers taskResourceUsageTrackers = new TaskResourceUsageTrackers(); + taskResourceUsageTrackers.addTracker(cpuUsageTracker, TaskResourceUsageTrackerType.CPU_USAGE_TRACKER); + taskResourceUsageTrackers.addTracker(heapUsageTracker, TaskResourceUsageTrackerType.HEAP_USAGE_TRACKER); + + // Mocking 'settings' with predictable rate limiting thresholds. + SearchBackpressureSettings settings = getBackpressureSettings("enforced", 0.1, 0.003, 10.0); + + SearchBackpressureService service = spy( + new SearchBackpressureService( + settings, + mockTaskResourceTrackingService, + threadPool, + mockTimeNanosSupplier, + nodeDuressTrackers, + taskResourceUsageTrackers, + new TaskResourceUsageTrackers(), + mockTaskManager + ) + ); + + when(service.isHeapUsageDominatedBySearch(anyList(), anyDouble())).thenReturn(false); + + service.doRun(); + service.doRun(); + + SearchTaskSettings searchTaskSettings = mock(SearchTaskSettings.class); + // setting the total heap percent threshold to minimum so that circuit does not break in SearchBackpressureService + when(searchTaskSettings.getTotalHeapPercentThreshold()).thenReturn(0.0); + when(settings.getSearchTaskSettings()).thenReturn(searchTaskSettings); + + // Create a mix of low and high resource usage tasks (60 low + 15 high resource usage tasks). + Map activeSearchTasks = new HashMap<>(); + for (long i = 0; i < 75; i++) { + Class taskType = randomBoolean() ? SearchTask.class : SearchShardTask.class; + if (i % 5 == 0) { + activeSearchTasks.put(i, createMockTaskWithResourceStats(taskType, 500, 800, i)); + } else { + activeSearchTasks.put(i, createMockTaskWithResourceStats(taskType, 100, 800, i)); + } + } + doReturn(activeSearchTasks).when(mockTaskResourceTrackingService).getResourceAwareTasks(); + + // this will trigger cancellation but the cancellation should not happen as the node is not is duress because of search traffic + service.doRun(); + + verify(mockTaskManager, times(0)).cancelTaskAndDescendants(any(), anyString(), anyBoolean(), any()); + assertEquals(0, service.getSearchBackpressureState(SearchTask.class).getCancellationCount()); + assertEquals(0, service.getSearchBackpressureState(SearchShardTask.class).getCancellationCount()); + } + private SearchBackpressureSettings getBackpressureSettings(String mode, double ratio, double rate, double burst) { return spy( new SearchBackpressureSettings( @@ -344,11 +577,14 @@ private SearchBackpressureSettings getBackpressureSettings(String mode, double r ); } - private TaskResourceUsageTracker getMockedTaskResourceUsageTracker() { + private TaskResourceUsageTracker getMockedTaskResourceUsageTracker( + TaskResourceUsageTrackerType type, + TaskResourceUsageTracker.ResourceUsageBreachEvaluator evaluator + ) { return new TaskResourceUsageTracker() { @Override public String name() { - return TaskResourceUsageTrackerType.CPU_USAGE_TRACKER.getName(); + return type.getName(); } @Override @@ -356,11 +592,7 @@ public void update(Task task) {} @Override public Optional checkAndMaybeGetCancellationReason(Task task) { - if (task.getTotalResourceStats().getCpuTimeInNanos() < 300) { - return Optional.empty(); - } - - return Optional.of(new TaskCancellation.Reason("limits exceeded", 5)); + return evaluator.evaluate(task); } @Override diff --git a/server/src/test/java/org/opensearch/search/backpressure/stats/SearchShardTaskStatsTests.java b/server/src/test/java/org/opensearch/search/backpressure/stats/SearchShardTaskStatsTests.java index f28b82cad30d3..45a44136d41f7 100644 --- a/server/src/test/java/org/opensearch/search/backpressure/stats/SearchShardTaskStatsTests.java +++ b/server/src/test/java/org/opensearch/search/backpressure/stats/SearchShardTaskStatsTests.java @@ -12,8 +12,8 @@ import org.opensearch.search.backpressure.trackers.CpuUsageTracker; import org.opensearch.search.backpressure.trackers.ElapsedTimeTracker; import org.opensearch.search.backpressure.trackers.HeapUsageTracker; -import org.opensearch.search.backpressure.trackers.TaskResourceUsageTracker; import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType; +import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers.TaskResourceUsageTracker; import org.opensearch.test.AbstractWireSerializingTestCase; import java.util.Map; diff --git a/server/src/test/java/org/opensearch/search/backpressure/stats/SearchTaskStatsTests.java b/server/src/test/java/org/opensearch/search/backpressure/stats/SearchTaskStatsTests.java index cc7aa92826b41..3ac5cfd658fc3 100644 --- a/server/src/test/java/org/opensearch/search/backpressure/stats/SearchTaskStatsTests.java +++ b/server/src/test/java/org/opensearch/search/backpressure/stats/SearchTaskStatsTests.java @@ -12,8 +12,8 @@ import org.opensearch.search.backpressure.trackers.CpuUsageTracker; import org.opensearch.search.backpressure.trackers.ElapsedTimeTracker; import org.opensearch.search.backpressure.trackers.HeapUsageTracker; -import org.opensearch.search.backpressure.trackers.TaskResourceUsageTracker; import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType; +import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers.TaskResourceUsageTracker; import org.opensearch.test.AbstractWireSerializingTestCase; import java.util.Map; diff --git a/server/src/test/java/org/opensearch/search/backpressure/trackers/CpuUsageTrackerTests.java b/server/src/test/java/org/opensearch/search/backpressure/trackers/CpuUsageTrackerTests.java index 8cdcbc7511bd2..0117b0ed71c27 100644 --- a/server/src/test/java/org/opensearch/search/backpressure/trackers/CpuUsageTrackerTests.java +++ b/server/src/test/java/org/opensearch/search/backpressure/trackers/CpuUsageTrackerTests.java @@ -33,7 +33,7 @@ public class CpuUsageTrackerTests extends OpenSearchTestCase { ); public void testSearchTaskEligibleForCancellation() { - Task task = createMockTaskWithResourceStats(SearchTask.class, 100000000, 200); + Task task = createMockTaskWithResourceStats(SearchTask.class, 100000000, 200, randomNonNegativeLong()); CpuUsageTracker tracker = new CpuUsageTracker(mockSettings.getSearchTaskSettings()::getCpuTimeNanosThreshold); Optional reason = tracker.checkAndMaybeGetCancellationReason(task); @@ -43,7 +43,7 @@ public void testSearchTaskEligibleForCancellation() { } public void testSearchShardTaskEligibleForCancellation() { - Task task = createMockTaskWithResourceStats(SearchShardTask.class, 200000000, 200); + Task task = createMockTaskWithResourceStats(SearchShardTask.class, 200000000, 200, randomNonNegativeLong()); CpuUsageTracker tracker = new CpuUsageTracker(mockSettings.getSearchShardTaskSettings()::getCpuTimeNanosThreshold); Optional reason = tracker.checkAndMaybeGetCancellationReason(task); @@ -53,7 +53,7 @@ public void testSearchShardTaskEligibleForCancellation() { } public void testNotEligibleForCancellation() { - Task task = createMockTaskWithResourceStats(SearchShardTask.class, 5000000, 200); + Task task = createMockTaskWithResourceStats(SearchShardTask.class, 5000000, 200, randomNonNegativeLong()); CpuUsageTracker tracker = new CpuUsageTracker(mockSettings.getSearchShardTaskSettings()::getCpuTimeNanosThreshold); Optional reason = tracker.checkAndMaybeGetCancellationReason(task); diff --git a/server/src/test/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTrackerTests.java b/server/src/test/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTrackerTests.java index 921d01e7355a7..514f1b4785aa1 100644 --- a/server/src/test/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTrackerTests.java +++ b/server/src/test/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTrackerTests.java @@ -47,7 +47,7 @@ public void testSearchTaskEligibleForCancellation() { } public void testSearchShardTaskEligibleForCancellation() { - Task task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 1, 0); + Task task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 1, 0, randomNonNegativeLong()); ElapsedTimeTracker tracker = new ElapsedTimeTracker( mockSettings.getSearchShardTaskSettings()::getElapsedTimeNanosThreshold, () -> 200000000 @@ -60,7 +60,7 @@ public void testSearchShardTaskEligibleForCancellation() { } public void testNotEligibleForCancellation() { - Task task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 1, 150000000); + Task task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 1, 150000000, randomNonNegativeLong()); ElapsedTimeTracker tracker = new ElapsedTimeTracker( mockSettings.getSearchShardTaskSettings()::getElapsedTimeNanosThreshold, () -> 200000000 diff --git a/server/src/test/java/org/opensearch/search/backpressure/trackers/HeapUsageTrackerTests.java b/server/src/test/java/org/opensearch/search/backpressure/trackers/HeapUsageTrackerTests.java index 3950d00b0c8b5..1c46305e9fda6 100644 --- a/server/src/test/java/org/opensearch/search/backpressure/trackers/HeapUsageTrackerTests.java +++ b/server/src/test/java/org/opensearch/search/backpressure/trackers/HeapUsageTrackerTests.java @@ -58,7 +58,7 @@ public void testSearchTaskEligibleForCancellation() { SearchTaskSettings.SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE ) ); - Task task = createMockTaskWithResourceStats(SearchTask.class, 1, 50); + Task task = createMockTaskWithResourceStats(SearchTask.class, 1, 50, randomNonNegativeLong()); // Record enough observations to make the moving average 'ready'. for (int i = 0; i < HEAP_MOVING_AVERAGE_WINDOW_SIZE; i++) { @@ -66,7 +66,7 @@ public void testSearchTaskEligibleForCancellation() { } // Task that has heap usage >= heapBytesThreshold and (movingAverage * heapVariance). - task = createMockTaskWithResourceStats(SearchTask.class, 1, 300); + task = createMockTaskWithResourceStats(SearchTask.class, 1, 300, randomNonNegativeLong()); Optional reason = tracker.checkAndMaybeGetCancellationReason(task); assertTrue(reason.isPresent()); assertEquals(6, reason.get().getCancellationScore()); @@ -88,7 +88,7 @@ public void testSearchShardTaskEligibleForCancellation() { SearchShardTaskSettings.SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE ) ); - Task task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 50); + Task task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 50, randomNonNegativeLong()); // Record enough observations to make the moving average 'ready'. for (int i = 0; i < HEAP_MOVING_AVERAGE_WINDOW_SIZE; i++) { @@ -96,7 +96,7 @@ public void testSearchShardTaskEligibleForCancellation() { } // Task that has heap usage >= heapBytesThreshold and (movingAverage * heapVariance). - task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 200); + task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 200, randomNonNegativeLong()); Optional reason = tracker.checkAndMaybeGetCancellationReason(task); assertTrue(reason.isPresent()); assertEquals(4, reason.get().getCancellationScore()); @@ -122,7 +122,7 @@ public void testNotEligibleForCancellation() { ); // Task with heap usage < heapBytesThreshold. - task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 99); + task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 99, randomNonNegativeLong()); // Not enough observations. reason = tracker.checkAndMaybeGetCancellationReason(task); @@ -139,7 +139,12 @@ public void testNotEligibleForCancellation() { // Task with heap usage between heapBytesThreshold and (movingAverage * heapVariance) should not be cancelled. double allowedHeapUsage = 99.0 * 2.0; - task = createMockTaskWithResourceStats(SearchShardTask.class, 1, randomLongBetween(99, (long) allowedHeapUsage - 1)); + task = createMockTaskWithResourceStats( + SearchShardTask.class, + 1, + randomLongBetween(99, (long) allowedHeapUsage - 1), + randomNonNegativeLong() + ); reason = tracker.checkAndMaybeGetCancellationReason(task); assertFalse(reason.isPresent()); } @@ -148,12 +153,12 @@ public void testIsHeapUsageDominatedBySearch() { assumeTrue("Skip the test if the hardware doesn't support heap usage tracking", HeapUsageTracker.isHeapTrackingSupported()); // task with 1 byte of heap usage so that it does not breach the threshold - CancellableTask task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 1); + CancellableTask task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 1, randomNonNegativeLong()); assertFalse(HeapUsageTracker.isHeapUsageDominatedBySearch(List.of(task), 0.5)); long totalHeap = JvmStats.jvmStats().getMem().getHeapMax().getBytes(); // task with heap usage of [totalHeap - 1] so that it breaches the threshold - task = createMockTaskWithResourceStats(SearchShardTask.class, 1, totalHeap - 1); + task = createMockTaskWithResourceStats(SearchShardTask.class, 1, totalHeap - 1, randomNonNegativeLong()); assertTrue(HeapUsageTracker.isHeapUsageDominatedBySearch(List.of(task), 0.5)); } } diff --git a/server/src/test/java/org/opensearch/search/backpressure/trackers/NodeDuressTrackerTests.java b/server/src/test/java/org/opensearch/search/backpressure/trackers/NodeDuressTrackerTests.java index 472ba95566523..32aca6ac3230e 100644 --- a/server/src/test/java/org/opensearch/search/backpressure/trackers/NodeDuressTrackerTests.java +++ b/server/src/test/java/org/opensearch/search/backpressure/trackers/NodeDuressTrackerTests.java @@ -8,6 +8,7 @@ package org.opensearch.search.backpressure.trackers; +import org.opensearch.search.backpressure.trackers.NodeDuressTrackers.NodeDuressTracker; import org.opensearch.test.OpenSearchTestCase; import java.util.concurrent.atomic.AtomicReference; @@ -16,20 +17,20 @@ public class NodeDuressTrackerTests extends OpenSearchTestCase { public void testNodeDuressTracker() { AtomicReference cpuUsage = new AtomicReference<>(0.0); - NodeDuressTracker tracker = new NodeDuressTracker(() -> cpuUsage.get() >= 0.5); + NodeDuressTracker tracker = new NodeDuressTracker(() -> cpuUsage.get() >= 0.5, () -> 3); // Node not in duress. - assertEquals(0, tracker.check()); + assertFalse(tracker.test()); // Node in duress; the streak must keep increasing. cpuUsage.set(0.7); - assertEquals(1, tracker.check()); - assertEquals(2, tracker.check()); - assertEquals(3, tracker.check()); + assertFalse(tracker.test()); + assertFalse(tracker.test()); + assertTrue(tracker.test()); // Node not in duress anymore. cpuUsage.set(0.3); - assertEquals(0, tracker.check()); - assertEquals(0, tracker.check()); + assertFalse(tracker.test()); + assertFalse(tracker.test()); } } diff --git a/server/src/test/java/org/opensearch/search/backpressure/trackers/NodeDuressTrackersTests.java b/server/src/test/java/org/opensearch/search/backpressure/trackers/NodeDuressTrackersTests.java new file mode 100644 index 0000000000000..2db251ee461db --- /dev/null +++ b/server/src/test/java/org/opensearch/search/backpressure/trackers/NodeDuressTrackersTests.java @@ -0,0 +1,84 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.backpressure.trackers; + +import org.opensearch.search.ResourceType; +import org.opensearch.search.backpressure.trackers.NodeDuressTrackers.NodeDuressTracker; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.EnumMap; + +public class NodeDuressTrackersTests extends OpenSearchTestCase { + + public void testNodeNotInDuress() { + EnumMap map = new EnumMap<>(ResourceType.class) { + { + put(ResourceType.JVM, new NodeDuressTracker(() -> false, () -> 2)); + put(ResourceType.CPU, new NodeDuressTracker(() -> false, () -> 2)); + } + }; + + NodeDuressTrackers nodeDuressTrackers = new NodeDuressTrackers(map); + + assertFalse(nodeDuressTrackers.isNodeInDuress()); + assertFalse(nodeDuressTrackers.isNodeInDuress()); + assertFalse(nodeDuressTrackers.isNodeInDuress()); + } + + public void testNodeInDuressWhenHeapInDuress() { + EnumMap map = new EnumMap<>(ResourceType.class) { + { + put(ResourceType.JVM, new NodeDuressTracker(() -> true, () -> 3)); + put(ResourceType.CPU, new NodeDuressTracker(() -> false, () -> 1)); + } + }; + + NodeDuressTrackers nodeDuressTrackers = new NodeDuressTrackers(map); + + assertFalse(nodeDuressTrackers.isNodeInDuress()); + assertFalse(nodeDuressTrackers.isNodeInDuress()); + + // for the third time it should be in duress + assertTrue(nodeDuressTrackers.isNodeInDuress()); + } + + public void testNodeInDuressWhenCPUInDuress() { + EnumMap map = new EnumMap<>(ResourceType.class) { + { + put(ResourceType.JVM, new NodeDuressTracker(() -> false, () -> 1)); + put(ResourceType.CPU, new NodeDuressTracker(() -> true, () -> 3)); + } + }; + + NodeDuressTrackers nodeDuressTrackers = new NodeDuressTrackers(map); + + assertFalse(nodeDuressTrackers.isNodeInDuress()); + assertFalse(nodeDuressTrackers.isNodeInDuress()); + + // for the third time it should be in duress + assertTrue(nodeDuressTrackers.isNodeInDuress()); + } + + public void testNodeInDuressWhenCPUAndHeapInDuress() { + EnumMap map = new EnumMap<>(ResourceType.class) { + { + put(ResourceType.JVM, new NodeDuressTracker(() -> true, () -> 3)); + put(ResourceType.CPU, new NodeDuressTracker(() -> false, () -> 3)); + } + }; + + NodeDuressTrackers nodeDuressTrackers = new NodeDuressTrackers(map); + + assertFalse(nodeDuressTrackers.isNodeInDuress()); + assertFalse(nodeDuressTrackers.isNodeInDuress()); + + // for the third time it should be in duress + assertTrue(nodeDuressTrackers.isNodeInDuress()); + } +} diff --git a/server/src/test/java/org/opensearch/tasks/TaskCancellationTests.java b/server/src/test/java/org/opensearch/tasks/TaskCancellationTests.java index e74f89c905499..f08c12ea258ca 100644 --- a/server/src/test/java/org/opensearch/tasks/TaskCancellationTests.java +++ b/server/src/test/java/org/opensearch/tasks/TaskCancellationTests.java @@ -9,7 +9,7 @@ package org.opensearch.tasks; import org.opensearch.action.search.SearchShardTask; -import org.opensearch.search.backpressure.trackers.TaskResourceUsageTracker; +import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers.TaskResourceUsageTracker; import org.opensearch.test.OpenSearchTestCase; import java.util.ArrayList; @@ -69,7 +69,7 @@ public Optional checkAndMaybeGetCancellationReason(Task } @Override - public Stats stats(List activeTasks) { + public TaskResourceUsageTracker.Stats stats(List activeTasks) { return null; } }; diff --git a/test/framework/src/main/java/org/opensearch/repositories/blobstore/BlobStoreTestUtil.java b/test/framework/src/main/java/org/opensearch/repositories/blobstore/BlobStoreTestUtil.java index f55eb72b7aa28..32f445bf24a41 100644 --- a/test/framework/src/main/java/org/opensearch/repositories/blobstore/BlobStoreTestUtil.java +++ b/test/framework/src/main/java/org/opensearch/repositories/blobstore/BlobStoreTestUtil.java @@ -90,7 +90,6 @@ import static org.opensearch.test.OpenSearchTestCase.buildNewFakeTransportAddress; import static org.opensearch.test.OpenSearchTestCase.randomIntBetween; -import static org.hamcrest.Matchers.anEmptyMap; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.hasKey; @@ -143,7 +142,7 @@ public static void assertConsistency(BlobStoreRepository repository, Executor ex } assertIndexUUIDs(repository, repositoryData); assertSnapshotUUIDs(repository, repositoryData); - assertShardIndexGenerations(repository, blobContainer, repositoryData); + assertShardIndexGenerations(blobContainer, repositoryData); return null; } catch (AssertionError e) { return e; @@ -167,8 +166,7 @@ private static void assertIndexGenerations(BlobContainer repoRoot, long latestGe assertTrue(indexGenerations.length <= 2); } - private static void assertShardIndexGenerations(BlobStoreRepository repository, BlobContainer repoRoot, RepositoryData repositoryData) - throws IOException { + private static void assertShardIndexGenerations(BlobContainer repoRoot, RepositoryData repositoryData) throws IOException { final ShardGenerations shardGenerations = repositoryData.shardGenerations(); final BlobContainer indicesContainer = repoRoot.children().get("indices"); for (IndexId index : shardGenerations.indices()) { @@ -176,22 +174,16 @@ private static void assertShardIndexGenerations(BlobStoreRepository repository, if (gens.isEmpty() == false) { final BlobContainer indexContainer = indicesContainer.children().get(index.getId()); final Map shardContainers = indexContainer.children(); - if (isRemoteSnapshot(repository, repositoryData, index)) { - // If the source of the data is another snapshot (i.e. searchable snapshot) - // then assert that there is no shard data (because it exists in the source snapshot) - assertThat(shardContainers, anEmptyMap()); - } else { - for (int i = 0; i < gens.size(); i++) { - final String generation = gens.get(i); - assertThat(generation, not(ShardGenerations.DELETED_SHARD_GEN)); - if (generation != null && generation.equals(ShardGenerations.NEW_SHARD_GEN) == false) { - final String shardId = Integer.toString(i); - assertThat(shardContainers, hasKey(shardId)); - assertThat( - shardContainers.get(shardId).listBlobsByPrefix(BlobStoreRepository.INDEX_FILE_PREFIX), - hasKey(BlobStoreRepository.INDEX_FILE_PREFIX + generation) - ); - } + for (int i = 0; i < gens.size(); i++) { + final String generation = gens.get(i); + assertThat(generation, not(ShardGenerations.DELETED_SHARD_GEN)); + if (generation != null && generation.equals(ShardGenerations.NEW_SHARD_GEN) == false) { + final String shardId = Integer.toString(i); + assertThat(shardContainers, hasKey(shardId)); + assertThat( + shardContainers.get(shardId).listBlobsByPrefix(BlobStoreRepository.INDEX_FILE_PREFIX), + hasKey(BlobStoreRepository.INDEX_FILE_PREFIX + generation) + ); } } } diff --git a/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java index 50b27ec000615..28323a94af721 100644 --- a/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java @@ -69,6 +69,7 @@ import org.opensearch.common.lease.Releasable; import org.opensearch.common.lease.Releasables; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; +import org.opensearch.common.lucene.search.Queries; import org.opensearch.common.network.NetworkAddress; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.BigArrays; @@ -533,6 +534,17 @@ protected A searchAndReduc return searchAndReduce(createIndexSettings(), searcher, query, builder, maxBucket, fieldTypes); } + protected A searchAndReduce( + IndexSettings indexSettings, + IndexSearcher searcher, + Query query, + AggregationBuilder builder, + int maxBucket, + MappedFieldType... fieldTypes + ) throws IOException { + return searchAndReduce(indexSettings, searcher, query, builder, maxBucket, false, fieldTypes); + } + /** * Collects all documents that match the provided query {@link Query} and * returns the reduced {@link InternalAggregation}. @@ -547,11 +559,15 @@ protected A searchAndReduc Query query, AggregationBuilder builder, int maxBucket, + boolean hasNested, MappedFieldType... fieldTypes ) throws IOException { final IndexReaderContext ctx = searcher.getTopReaderContext(); final PipelineTree pipelines = builder.buildPipelineTree(); List aggs = new ArrayList<>(); + if (hasNested) { + query = Queries.filtered(query, Queries.newNonNestedFilter()); + } Query rewritten = searcher.rewrite(query); MultiBucketConsumer bucketConsumer = new MultiBucketConsumer( maxBucket, diff --git a/test/framework/src/main/java/org/opensearch/search/backpressure/SearchBackpressureTestHelpers.java b/test/framework/src/main/java/org/opensearch/search/backpressure/SearchBackpressureTestHelpers.java index af06b1688dca2..8f31f2a60ea86 100644 --- a/test/framework/src/main/java/org/opensearch/search/backpressure/SearchBackpressureTestHelpers.java +++ b/test/framework/src/main/java/org/opensearch/search/backpressure/SearchBackpressureTestHelpers.java @@ -21,19 +21,21 @@ public class SearchBackpressureTestHelpers extends OpenSearchTestCase { - public static T createMockTaskWithResourceStats(Class type, long cpuUsage, long heapUsage) { - return createMockTaskWithResourceStats(type, cpuUsage, heapUsage, 0); + public static T createMockTaskWithResourceStats(Class type, long cpuUsage, long heapUsage, long taskId) { + return createMockTaskWithResourceStats(type, cpuUsage, heapUsage, 0, taskId); } public static T createMockTaskWithResourceStats( Class type, long cpuUsage, long heapUsage, - long startTimeNanos + long startTimeNanos, + long taskId ) { T task = mock(type); when(task.getTotalResourceStats()).thenReturn(new TaskResourceUsage(cpuUsage, heapUsage)); when(task.getStartTimeNanos()).thenReturn(startTimeNanos); + when(task.getId()).thenReturn(randomNonNegativeLong()); AtomicBoolean isCancelled = new AtomicBoolean(false); doAnswer(invocation -> {