diff --git a/CHANGELOG.md b/CHANGELOG.md index 0ba799d40f180..f46c875b8db09 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Flat object field use IndexOrDocValuesQuery to optimize query ([#14383](https://github.com/opensearch-project/OpenSearch/issues/14383)) - Add method to return dynamic SecureTransportParameters from SecureTransportSettingsProvider interface ([#16387](https://github.com/opensearch-project/OpenSearch/pull/16387) - Add _list/shards API as paginated alternate to _cat/shards ([#14641](https://github.com/opensearch-project/OpenSearch/pull/14641)) +- [Star Tree - Search] Add support for metric aggregations with/without term query ([15289](https://github.com/opensearch-project/OpenSearch/pull/15289)) ### Dependencies - Bump `org.apache.logging.log4j:log4j-core` from 2.23.1 to 2.24.0 ([#15858](https://github.com/opensearch-project/OpenSearch/pull/15858)) @@ -33,7 +34,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Bump `protobuf` from 3.22.3 to 3.25.4 ([#15684](https://github.com/opensearch-project/OpenSearch/pull/15684)) - Bump `peter-evans/create-pull-request` from 6 to 7 ([#15863](https://github.com/opensearch-project/OpenSearch/pull/15863)) - Bump `com.nimbusds:oauth2-oidc-sdk` from 11.9.1 to 11.19.1 ([#15862](https://github.com/opensearch-project/OpenSearch/pull/15862)) -- Bump `com.microsoft.azure:msal4j` from 1.17.0 to 1.17.1 ([#15945](https://github.com/opensearch-project/OpenSearch/pull/15945)) +- Bump `com.microsoft.azure:msal4j` from 1.17.0 to 1.17.2 ([#15945](https://github.com/opensearch-project/OpenSearch/pull/15945), [#16406](https://github.com/opensearch-project/OpenSearch/pull/16406)) - Bump `ch.qos.logback:logback-core` from 1.5.6 to 1.5.10 ([#15946](https://github.com/opensearch-project/OpenSearch/pull/15946), [#16307](https://github.com/opensearch-project/OpenSearch/pull/16307)) - Update protobuf from 3.25.4 to 3.25.5 ([#16011](https://github.com/opensearch-project/OpenSearch/pull/16011)) - Bump `org.roaringbitmap:RoaringBitmap` from 1.2.1 to 1.3.0 ([#16040](https://github.com/opensearch-project/OpenSearch/pull/16040)) @@ -87,6 +88,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix wrong default value when setting `index.number_of_routing_shards` to null on index creation ([#16331](https://github.com/opensearch-project/OpenSearch/pull/16331)) - [Workload Management] Make query groups persistent across process restarts [#16370](https://github.com/opensearch-project/OpenSearch/pull/16370) - Fix missing fields in task index mapping to ensure proper task result storage ([#16201](https://github.com/opensearch-project/OpenSearch/pull/16201)) +- Fix array hashCode calculation in ResyncReplicationRequest ([#16378](https://github.com/opensearch-project/OpenSearch/pull/16378)) ### Security diff --git a/plugins/repository-azure/build.gradle b/plugins/repository-azure/build.gradle index 535fbbed736e6..a28aa7ee4cdf2 100644 --- a/plugins/repository-azure/build.gradle +++ b/plugins/repository-azure/build.gradle @@ -61,7 +61,7 @@ dependencies { // Start of transitive dependencies for azure-identity api 'com.microsoft.azure:msal4j-persistence-extension:1.3.0' api "net.java.dev.jna:jna-platform:${versions.jna}" - api 'com.microsoft.azure:msal4j:1.17.1' + api 'com.microsoft.azure:msal4j:1.17.2' api 'com.nimbusds:oauth2-oidc-sdk:11.19.1' api 'com.nimbusds:nimbus-jose-jwt:9.41.1' api 'com.nimbusds:content-type:2.3' diff --git a/plugins/repository-azure/licenses/msal4j-1.17.1.jar.sha1 b/plugins/repository-azure/licenses/msal4j-1.17.1.jar.sha1 deleted file mode 100644 index 46c14e819b630..0000000000000 --- a/plugins/repository-azure/licenses/msal4j-1.17.1.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -4eb31a9919d9b103c548af7e37e6f9d9f6e46dbc \ No newline at end of file diff --git a/plugins/repository-azure/licenses/msal4j-1.17.2.jar.sha1 b/plugins/repository-azure/licenses/msal4j-1.17.2.jar.sha1 new file mode 100644 index 0000000000000..b5219ee17e9fa --- /dev/null +++ b/plugins/repository-azure/licenses/msal4j-1.17.2.jar.sha1 @@ -0,0 +1 @@ +a6211e3d71d0388929babaa0ff0951b30d001852 \ No newline at end of file diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/SnapshotStatusApisIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/SnapshotStatusApisIT.java index 8b6869aa1d81a..123277a3780a2 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/SnapshotStatusApisIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/SnapshotStatusApisIT.java @@ -619,9 +619,9 @@ public void testSnapshotStatusApiFailureForTooManyShardsAcrossSnapshots() throws ); assertEquals(exception.status(), RestStatus.TOO_MANY_REQUESTS); assertTrue( - exception.getMessage().endsWith(" is more than the maximum allowed value of shard count [2] for snapshot status request") + exception.getMessage().contains(" is more than the maximum allowed value of shard count [2] for snapshot status request") ); - }, 1, TimeUnit.MINUTES); + }); // across multiple snapshots assertBusy(() -> { @@ -636,13 +636,13 @@ public void testSnapshotStatusApiFailureForTooManyShardsAcrossSnapshots() throws ); assertEquals(exception.status(), RestStatus.TOO_MANY_REQUESTS); assertTrue( - exception.getMessage().endsWith(" is more than the maximum allowed value of shard count [2] for snapshot status request") + exception.getMessage().contains(" is more than the maximum allowed value of shard count [2] for snapshot status request") ); - }, 1, TimeUnit.MINUTES); + }); logger.info("Reset MAX_SHARDS_ALLOWED_IN_STATUS_API to default value"); updateSettingsRequest.persistentSettings(Settings.builder().putNull(MAX_SHARDS_ALLOWED_IN_STATUS_API.getKey())); - assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); } public void testSnapshotStatusForIndexFilter() throws Exception { @@ -666,6 +666,7 @@ public void testSnapshotStatusForIndexFilter() throws Exception { String snapshot = "test-snap-1"; createSnapshot(repositoryName, snapshot, List.of(index1, index2, index3)); + // for a completed snapshot assertBusy(() -> { SnapshotStatus snapshotsStatus = client().admin() .cluster() @@ -682,6 +683,96 @@ public void testSnapshotStatusForIndexFilter() throws Exception { }, 1, TimeUnit.MINUTES); } + public void testSnapshotStatusForIndexFilterForInProgressSnapshot() throws Exception { + String repositoryName = "test-repo"; + createRepository(repositoryName, "mock", Settings.builder().put("location", randomRepoPath()).put("block_on_data", true)); + + logger.info("Create indices"); + String index1 = "test-idx-1"; + String index2 = "test-idx-2"; + String index3 = "test-idx-3"; + createIndex(index1, index2, index3); + ensureGreen(); + + logger.info("Indexing some data"); + for (int i = 0; i < 10; i++) { + index(index1, "_doc", Integer.toString(i), "foo", "bar" + i); + index(index2, "_doc", Integer.toString(i), "foo", "baz" + i); + index(index3, "_doc", Integer.toString(i), "foo", "baz" + i); + } + refresh(); + String inProgressSnapshot = "test-in-progress-snapshot"; + + logger.info("Create snapshot"); + ActionFuture createSnapshotResponseActionFuture = startFullSnapshot(repositoryName, inProgressSnapshot); + + logger.info("Block data node"); + waitForBlockOnAnyDataNode(repositoryName, TimeValue.timeValueMinutes(1)); + awaitNumberOfSnapshotsInProgress(1); + + // test normal functioning of index filter for in progress snapshot + assertBusy(() -> { + SnapshotStatus snapshotsStatus = client().admin() + .cluster() + .prepareSnapshotStatus(repositoryName) + .setSnapshots(inProgressSnapshot) + .setIndices(index1, index2) + .get() + .getSnapshots() + .get(0); + Map snapshotIndexStatusMap = snapshotsStatus.getIndices(); + // Although the snapshot contains 3 indices, the response of status api call only contains results for 2 + assertEquals(snapshotIndexStatusMap.size(), 2); + assertEquals(snapshotIndexStatusMap.keySet(), Set.of(index1, index2)); + }); + + // when a non-existent index is requested in the index-filter + assertBusy(() -> { + // failure due to index not found in snapshot + final String nonExistentIndex1 = "non-existent-index-1"; + final String nonExistentIndex2 = "non-existent-index-2"; + Exception ex = expectThrows( + Exception.class, + () -> client().admin() + .cluster() + .prepareSnapshotStatus(repositoryName) + .setSnapshots(inProgressSnapshot) + .setIndices(index1, index2, nonExistentIndex1, nonExistentIndex2) + .execute() + .actionGet() + ); + String cause = String.format( + Locale.ROOT, + "indices [%s] missing in snapshot [%s] of repository [%s]", + String.join(", ", List.of(nonExistentIndex2, nonExistentIndex1)), + inProgressSnapshot, + repositoryName + ); + assertEquals(cause, ex.getCause().getMessage()); + + // no error for ignore_unavailable = true and status response contains only the found indices + SnapshotStatus snapshotsStatus = client().admin() + .cluster() + .prepareSnapshotStatus(repositoryName) + .setSnapshots(inProgressSnapshot) + .setIndices(index1, index2, nonExistentIndex1, nonExistentIndex2) + .setIgnoreUnavailable(true) + .get() + .getSnapshots() + .get(0); + + Map snapshotIndexStatusMap = snapshotsStatus.getIndices(); + assertEquals(snapshotIndexStatusMap.size(), 2); + assertEquals(snapshotIndexStatusMap.keySet(), Set.of(index1, index2)); + }); + + logger.info("Unblock data node"); + unblockAllDataNodes(repositoryName); + + logger.info("Wait for snapshot to finish"); + waitForCompletion(repositoryName, inProgressSnapshot, TimeValue.timeValueSeconds(60)); + } + public void testSnapshotStatusFailuresWithIndexFilter() throws Exception { String repositoryName = "test-repo"; String index1 = "test-idx-1"; @@ -705,6 +796,39 @@ public void testSnapshotStatusFailuresWithIndexFilter() throws Exception { createSnapshot(repositoryName, snapshot1, List.of(index1, index2, index3)); createSnapshot(repositoryName, snapshot2, List.of(index1)); + assertBusy(() -> { + // failure due to passing index filter for _all value of repository param + Exception ex = expectThrows( + Exception.class, + () -> client().admin() + .cluster() + .prepareSnapshotStatus("_all") + .setSnapshots(snapshot1) + .setIndices(index1, index2, index3) + .execute() + .actionGet() + ); + String cause = + "index list filter is supported only when a single 'repository' is passed, but found 'repository' param = [_all]"; + assertTrue(ex.getMessage().contains(cause)); + }); + + assertBusy(() -> { + // failure due to passing index filter for _all value of snapshot param --> gets translated as a blank array + Exception ex = expectThrows( + Exception.class, + () -> client().admin() + .cluster() + .prepareSnapshotStatus(repositoryName) + .setSnapshots() + .setIndices(index1, index2, index3) + .execute() + .actionGet() + ); + String cause = "index list filter is supported only when a single 'snapshot' is passed, but found 'snapshot' param = [_all]"; + assertTrue(ex.getMessage().contains(cause)); + }); + assertBusy(() -> { // failure due to passing index filter for multiple snapshots ActionRequestValidationException ex = expectThrows( @@ -717,9 +841,10 @@ public void testSnapshotStatusFailuresWithIndexFilter() throws Exception { .execute() .actionGet() ); - String cause = "index list filter is supported only for a single snapshot"; + String cause = + "index list filter is supported only when a single 'snapshot' is passed, but found 'snapshot' param = [[test-snap-1, test-snap-2]]"; assertTrue(ex.getMessage().contains(cause)); - }, 1, TimeUnit.MINUTES); + }); assertBusy(() -> { // failure due to index not found in snapshot @@ -743,7 +868,18 @@ public void testSnapshotStatusFailuresWithIndexFilter() throws Exception { ); assertEquals(cause, ex.getCause().getMessage()); - }, 1, TimeUnit.MINUTES); + // no error for ignore_unavailable = true and status response contains only the found indices + SnapshotStatus snapshotsStatus = client().admin() + .cluster() + .prepareSnapshotStatus(repositoryName) + .setSnapshots(snapshot2) + .setIndices(index1, index2, index3) + .setIgnoreUnavailable(true) + .get() + .getSnapshots() + .get(0); + assertEquals(1, snapshotsStatus.getIndices().size()); + }); assertBusy(() -> { // failure due to too many shards requested @@ -763,12 +899,148 @@ public void testSnapshotStatusFailuresWithIndexFilter() throws Exception { .actionGet() ); assertEquals(ex.status(), RestStatus.TOO_MANY_REQUESTS); - assertTrue(ex.getMessage().endsWith(" is more than the maximum allowed value of shard count [2] for snapshot status request")); + assertTrue(ex.getMessage().contains(" is more than the maximum allowed value of shard count [2] for snapshot status request")); logger.info("Reset MAX_SHARDS_ALLOWED_IN_STATUS_API to default value"); updateSettingsRequest.persistentSettings(Settings.builder().putNull(MAX_SHARDS_ALLOWED_IN_STATUS_API.getKey())); - assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); - }, 2, TimeUnit.MINUTES); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + }); + } + + public void testSnapshotStatusShardLimitOfResponseForInProgressSnapshot() throws Exception { + logger.info("Create repository"); + String repositoryName = "test-repo"; + createRepository( + repositoryName, + "mock", + Settings.builder() + .put("location", randomRepoPath()) + .put("compress", false) + .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES) + .put("wait_after_unblock", 200) + ); + + logger.info("Create indices"); + String index1 = "test-idx-1"; + String index2 = "test-idx-2"; + String index3 = "test-idx-3"; + assertAcked(prepareCreate(index1, 1, Settings.builder().put("number_of_shards", 1).put("number_of_replicas", 0))); + assertAcked(prepareCreate(index2, 1, Settings.builder().put("number_of_shards", 1).put("number_of_replicas", 0))); + assertAcked(prepareCreate(index3, 1, Settings.builder().put("number_of_shards", 1).put("number_of_replicas", 0))); + ensureGreen(); + + logger.info("Index some data"); + indexRandomDocs(index1, 10); + indexRandomDocs(index2, 10); + indexRandomDocs(index3, 10); + + logger.info("Create completed snapshot"); + String completedSnapshot = "test-completed-snapshot"; + String blockedNode = blockNodeWithIndex(repositoryName, index1); + client().admin().cluster().prepareCreateSnapshot(repositoryName, completedSnapshot).setWaitForCompletion(false).get(); + waitForBlock(blockedNode, repositoryName, TimeValue.timeValueSeconds(60)); + unblockNode(repositoryName, blockedNode); + waitForCompletion(repositoryName, completedSnapshot, TimeValue.timeValueSeconds(60)); + + logger.info("Index some more data"); + indexRandomDocs(index1, 10); + indexRandomDocs(index2, 10); + indexRandomDocs(index3, 10); + refresh(); + + logger.info("Create in-progress snapshot"); + String inProgressSnapshot = "test-in-progress-snapshot"; + blockedNode = blockNodeWithIndex(repositoryName, index1); + client().admin().cluster().prepareCreateSnapshot(repositoryName, inProgressSnapshot).setWaitForCompletion(false).get(); + waitForBlock(blockedNode, repositoryName, TimeValue.timeValueSeconds(60)); + List snapshotStatuses = client().admin() + .cluster() + .prepareSnapshotStatus(repositoryName) + .setSnapshots(inProgressSnapshot, completedSnapshot) + .get() + .getSnapshots(); + + assertEquals(2, snapshotStatuses.size()); + assertEquals(SnapshotsInProgress.State.STARTED, snapshotStatuses.get(0).getState()); + assertEquals(SnapshotsInProgress.State.SUCCESS, snapshotStatuses.get(1).getState()); + + logger.info("Set MAX_SHARDS_ALLOWED_IN_STATUS_API to a low value"); + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.persistentSettings(Settings.builder().put(MAX_SHARDS_ALLOWED_IN_STATUS_API.getKey(), 1)); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + // shard limit exceeded due to inProgress snapshot alone @ without index-filter + assertBusy(() -> { + CircuitBreakingException exception = expectThrows( + CircuitBreakingException.class, + () -> client().admin() + .cluster() + .prepareSnapshotStatus(repositoryName) + .setSnapshots(inProgressSnapshot) + .execute() + .actionGet() + ); + assertEquals(exception.status(), RestStatus.TOO_MANY_REQUESTS); + assertTrue( + exception.getMessage().contains(" is more than the maximum allowed value of shard count [1] for snapshot status request") + ); + }); + + // shard limit exceeded due to inProgress snapshot alone @ with index-filter + assertBusy(() -> { + CircuitBreakingException exception = expectThrows( + CircuitBreakingException.class, + () -> client().admin() + .cluster() + .prepareSnapshotStatus(repositoryName) + .setSnapshots(inProgressSnapshot) + .setIndices(index1, index2) + .execute() + .actionGet() + ); + assertEquals(exception.status(), RestStatus.TOO_MANY_REQUESTS); + assertTrue( + exception.getMessage().contains(" is more than the maximum allowed value of shard count [1] for snapshot status request") + ); + }); + + logger.info("Set MAX_SHARDS_ALLOWED_IN_STATUS_API to a slightly higher value"); + updateSettingsRequest.persistentSettings(Settings.builder().put(MAX_SHARDS_ALLOWED_IN_STATUS_API.getKey(), 5)); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + // shard limit exceeded due to passing for inProgress but failing for current + completed + + assertBusy(() -> { + SnapshotStatus inProgressSnapshotStatus = client().admin() + .cluster() + .prepareSnapshotStatus(repositoryName) + .setSnapshots(inProgressSnapshot) + .get() + .getSnapshots() + .get(0); + assertEquals(3, inProgressSnapshotStatus.getShards().size()); + + CircuitBreakingException exception = expectThrows( + CircuitBreakingException.class, + () -> client().admin() + .cluster() + .prepareSnapshotStatus(repositoryName) + .setSnapshots(inProgressSnapshot, completedSnapshot) + .execute() + .actionGet() + ); + assertEquals(exception.status(), RestStatus.TOO_MANY_REQUESTS); + assertTrue( + exception.getMessage().contains(" is more than the maximum allowed value of shard count [5] for snapshot status request") + ); + }); + + unblockNode(repositoryName, blockedNode); + waitForCompletion(repositoryName, inProgressSnapshot, TimeValue.timeValueSeconds(60)); + + logger.info("Reset MAX_SHARDS_ALLOWED_IN_STATUS_API to default value"); + updateSettingsRequest.persistentSettings(Settings.builder().putNull(MAX_SHARDS_ALLOWED_IN_STATUS_API.getKey())); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); } private static SnapshotIndexShardStatus stateFirstShard(SnapshotStatus snapshotStatus, String indexName) { diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotsStatusRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotsStatusRequest.java index 3d7fb5b6beb56..a270dcfa53474 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotsStatusRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotsStatusRequest.java @@ -41,6 +41,7 @@ import org.opensearch.core.common.io.stream.StreamOutput; import java.io.IOException; +import java.util.Arrays; import static org.opensearch.action.ValidateActions.addValidationError; @@ -124,8 +125,20 @@ public ActionRequestValidationException validate() { if (snapshots == null) { validationException = addValidationError("snapshots is null", validationException); } - if (indices.length != 0 && snapshots.length != 1) { - validationException = addValidationError("index list filter is supported only for a single snapshot", validationException); + if (indices.length != 0) { + if (repository.equals("_all")) { + String error = + "index list filter is supported only when a single 'repository' is passed, but found 'repository' param = [_all]"; + validationException = addValidationError(error, validationException); + } + if (snapshots.length != 1) { + // snapshot param was '_all' (length = 0) or a list of snapshots (length > 1) + String snapshotParamValue = snapshots.length == 0 ? "_all" : Arrays.toString(snapshots); + String error = "index list filter is supported only when a single 'snapshot' is passed, but found 'snapshot' param = [" + + snapshotParamValue + + "]"; + validationException = addValidationError(error, validationException); + } } return validationException; } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java index 8228cb6301c8c..2c8b06bb5e8fe 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java @@ -99,8 +99,14 @@ public class TransportSnapshotsStatusAction extends TransportClusterManagerNodeA private final TransportNodesSnapshotsStatus transportNodesSnapshotsStatus; + private Set requestedIndexNames; + private long maximumAllowedShardCount; + private int totalShardsRequiredInResponse; + + private boolean requestUsesIndexFilter; + @Inject public TransportSnapshotsStatusAction( TransportService transportService, @@ -145,25 +151,21 @@ protected void clusterManagerOperation( final ClusterState state, final ActionListener listener ) throws Exception { + setupForRequest(request); + final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY); List currentSnapshots = SnapshotsService.currentSnapshots( snapshotsInProgress, request.repository(), Arrays.asList(request.snapshots()) ); + if (currentSnapshots.isEmpty()) { buildResponse(snapshotsInProgress, request, currentSnapshots, null, listener); return; } - Set nodesIds = new HashSet<>(); - for (SnapshotsInProgress.Entry entry : currentSnapshots) { - for (final SnapshotsInProgress.ShardSnapshotStatus status : entry.shards().values()) { - if (status.nodeId() != null) { - nodesIds.add(status.nodeId()); - } - } - } + Set nodesIds = getNodeIdsOfCurrentSnapshots(request, currentSnapshots); if (!nodesIds.isEmpty()) { // There are still some snapshots running - check their progress @@ -192,6 +194,97 @@ protected void clusterManagerOperation( } + private void setupForRequest(SnapshotsStatusRequest request) { + requestedIndexNames = new HashSet<>(Arrays.asList(request.indices())); + requestUsesIndexFilter = requestedIndexNames.isEmpty() == false; + totalShardsRequiredInResponse = 0; + maximumAllowedShardCount = clusterService.getClusterSettings().get(MAX_SHARDS_ALLOWED_IN_STATUS_API); + } + + /* + * To get the node IDs of the relevant (according to the index filter) shards which are part of current snapshots + * It also deals with any missing indices (for index-filter case) and calculates the number of shards contributed by all + * the current snapshots to the total count (irrespective of index-filter) + * If this count exceeds the limit, CircuitBreakingException is thrown + * */ + private Set getNodeIdsOfCurrentSnapshots(final SnapshotsStatusRequest request, List currentSnapshots) + throws CircuitBreakingException { + Set nodesIdsOfCurrentSnapshotShards = new HashSet<>(); + int totalShardsAcrossCurrentSnapshots = 0; + + for (SnapshotsInProgress.Entry currentSnapshotEntry : currentSnapshots) { + if (currentSnapshotEntry.remoteStoreIndexShallowCopyV2()) { + // skip current shallow v2 snapshots + continue; + } + if (requestUsesIndexFilter) { + // index-filter is allowed only for a single snapshot, which has to be this one + // first check if any requested indices are missing from this current snapshot + + final Set indicesInCurrentSnapshot = currentSnapshotEntry.indices() + .stream() + .map(IndexId::getName) + .collect(Collectors.toSet()); + + final Set indicesNotFound = requestedIndexNames.stream() + .filter(index -> indicesInCurrentSnapshot.contains(index) == false) + .collect(Collectors.toSet()); + + if (indicesNotFound.isEmpty() == false) { + handleIndexNotFound( + requestedIndexNames, + indicesNotFound, + request, + currentSnapshotEntry.snapshot().getSnapshotId().getName(), + false + ); + } + // the actual no. of shards contributed by this current snapshot will now be calculated + } else { + // all shards of this current snapshot are required in response + totalShardsAcrossCurrentSnapshots += currentSnapshotEntry.shards().size(); + } + + for (final Map.Entry shardStatusEntry : currentSnapshotEntry.shards() + .entrySet()) { + SnapshotsInProgress.ShardSnapshotStatus shardStatus = shardStatusEntry.getValue(); + boolean indexPresentInFilter = requestedIndexNames.contains(shardStatusEntry.getKey().getIndexName()); + + if (requestUsesIndexFilter && indexPresentInFilter) { + // count only those shards whose index belongs to the index-filter + totalShardsAcrossCurrentSnapshots++; + + // for non-index filter case, we already counted all the shards of this current snapshot (non-shallow v2) + } + + if (shardStatus.nodeId() != null) { + if (requestUsesIndexFilter) { + if (indexPresentInFilter) { + // include node only if the index containing this shard belongs to the index filter + nodesIdsOfCurrentSnapshotShards.add(shardStatus.nodeId()); + } + } else { + nodesIdsOfCurrentSnapshotShards.add(shardStatus.nodeId()); + } + } + } + } + + totalShardsRequiredInResponse += totalShardsAcrossCurrentSnapshots; + if (totalShardsRequiredInResponse > maximumAllowedShardCount) { + // index-filter is allowed only for a single snapshot. If index-filter is being used and limit got exceeded, + // this snapshot is current and its relevant indices contribute more shards than the limit + + // if index-filter is not being used and limit got exceed, there could be more shards required in response coming from completed + // snapshots + // but since the limit is already exceeded, we can fail request here + boolean couldInvolveMoreShards = requestUsesIndexFilter == false; + handleMaximumAllowedShardCountExceeded(request.repository(), totalShardsRequiredInResponse, couldInvolveMoreShards); + } + + return nodesIdsOfCurrentSnapshotShards; + } + private void buildResponse( SnapshotsInProgress snapshotsInProgress, SnapshotsStatusRequest request, @@ -215,6 +308,10 @@ private void buildResponse( List shardStatusBuilder = new ArrayList<>(); Map indexIdLookup = null; for (final Map.Entry shardEntry : entry.shards().entrySet()) { + if (requestUsesIndexFilter && requestedIndexNames.contains(shardEntry.getKey().getIndexName()) == false) { + // skip shard if its index does not belong to the index-filter + continue; + } SnapshotsInProgress.ShardSnapshotStatus status = shardEntry.getValue(); if (status.nodeId() != null) { // We should have information about this shard from the shard: @@ -320,7 +417,6 @@ private void loadRepositoryData( String repositoryName, ActionListener listener ) { - maximumAllowedShardCount = clusterService.getClusterSettings().get(MAX_SHARDS_ALLOWED_IN_STATUS_API); final StepListener repositoryDataListener = new StepListener<>(); repositoriesService.getRepositoryData(repositoryName, repositoryDataListener); repositoryDataListener.whenComplete(repositoryData -> { @@ -343,8 +439,7 @@ private void loadRepositoryData( snapshotInfo ); boolean isShallowV2Snapshot = snapshotInfo.getPinnedTimestamp() > 0; - long initialSnapshotTotalSize = 0; - if (isShallowV2Snapshot && request.indices().length == 0) { + if (isShallowV2Snapshot && requestUsesIndexFilter == false) { // TODO: add primary store size in bytes at the snapshot level } @@ -430,7 +525,10 @@ private Map snapshotsInfo( .stream() .filter(s -> requestedSnapshotNames.contains(s.getName())) .collect(Collectors.toMap(SnapshotId::getName, Function.identity())); + + // for no index filter-case and excludes shards from shallow v2 snapshots int totalShardsAcrossSnapshots = 0; + for (final String snapshotName : request.snapshots()) { if (currentSnapshotNames.contains(snapshotName)) { // we've already found this snapshot in the current snapshot entries, so skip over @@ -453,23 +551,15 @@ private Map snapshotsInfo( } SnapshotInfo snapshotInfo = snapshot(snapshotsInProgress, repositoryName, snapshotId); boolean isV2Snapshot = snapshotInfo.getPinnedTimestamp() > 0; - if (isV2Snapshot == false && request.indices().length == 0) { + if (isV2Snapshot == false && requestUsesIndexFilter == false) { totalShardsAcrossSnapshots += snapshotInfo.totalShards(); } snapshotsInfoMap.put(snapshotId, snapshotInfo); } - if (totalShardsAcrossSnapshots > maximumAllowedShardCount && request.indices().length == 0) { - String message = "[" - + repositoryName - + ":" - + String.join(", ", request.snapshots()) - + "]" - + " Total shard count [" - + totalShardsAcrossSnapshots - + "] is more than the maximum allowed value of shard count [" - + maximumAllowedShardCount - + "] for snapshot status request"; - throw new CircuitBreakingException(message, CircuitBreaker.Durability.PERMANENT); + totalShardsRequiredInResponse += totalShardsAcrossSnapshots; + if (totalShardsRequiredInResponse > maximumAllowedShardCount && requestUsesIndexFilter == false) { + // includes shard contributions from all snapshots (current and completed) + handleMaximumAllowedShardCountExceeded(repositoryName, totalShardsRequiredInResponse, false); } return unmodifiableMap(snapshotsInfoMap); } @@ -492,52 +582,46 @@ private Map snapshotShards( final RepositoryData repositoryData, final SnapshotInfo snapshotInfo ) throws IOException { - final Set requestedIndexNames = Sets.newHashSet(request.indices()); String snapshotName = snapshotInfo.snapshotId().getName(); - Set indices = Sets.newHashSet(snapshotInfo.indices()); - if (requestedIndexNames.isEmpty() == false) { - Set finalIndices = indices; - List indicesNotFound = requestedIndexNames.stream() - .filter(i -> finalIndices.contains(i) == false) - .collect(Collectors.toList()); + Set indicesToProcess; + if (requestUsesIndexFilter) { + Set snapshotIndices = Sets.newHashSet(snapshotInfo.indices()); + Set indicesNotFound = requestedIndexNames.stream() + .filter(index -> snapshotIndices.contains(index) == false) + .collect(Collectors.toSet()); if (indicesNotFound.isEmpty() == false) { - handleIndexNotFound(String.join(", ", indicesNotFound), request, snapshotName, repositoryName); + boolean moreMissingIndicesPossible = indicesNotFound.size() == requestedIndexNames.size(); + handleIndexNotFound(requestedIndexNames, indicesNotFound, request, snapshotName, moreMissingIndicesPossible); } - indices = requestedIndexNames; + indicesToProcess = requestedIndexNames; + } else { + // all indices of this snapshot + indicesToProcess = Sets.newHashSet(snapshotInfo.indices()); } final Repository repository = repositoriesService.repository(repositoryName); boolean isV2Snapshot = snapshotInfo.getPinnedTimestamp() > 0; + + // for index filter-case and excludes shards from shallow v2 snapshots int totalShardsAcrossIndices = 0; final Map indexMetadataMap = new HashMap<>(); - - for (String index : indices) { + for (String index : indicesToProcess) { IndexId indexId = repositoryData.resolveIndexId(index); IndexMetadata indexMetadata = repository.getSnapshotIndexMetaData(repositoryData, snapshotInfo.snapshotId(), indexId); if (indexMetadata != null) { - if (requestedIndexNames.isEmpty() == false && isV2Snapshot == false) { + if (requestUsesIndexFilter && isV2Snapshot == false) { totalShardsAcrossIndices += indexMetadata.getNumberOfShards(); } indexMetadataMap.put(indexId, indexMetadata); - } else if (requestedIndexNames.isEmpty() == false) { - handleIndexNotFound(index, request, snapshotName, repositoryName); + } else if (requestUsesIndexFilter) { + handleIndexNotFound(indicesToProcess, Collections.singleton(index), request, snapshotName, true); } } - if (totalShardsAcrossIndices > maximumAllowedShardCount && requestedIndexNames.isEmpty() == false && isV2Snapshot == false) { - String message = "[" - + repositoryName - + ":" - + String.join(", ", request.snapshots()) - + "]" - + " Total shard count [" - + totalShardsAcrossIndices - + "] across the requested indices [" - + requestedIndexNames.stream().collect(Collectors.joining(", ")) - + "] is more than the maximum allowed value of shard count [" - + maximumAllowedShardCount - + "] for snapshot status request"; - throw new CircuitBreakingException(message, CircuitBreaker.Durability.PERMANENT); + totalShardsRequiredInResponse += totalShardsAcrossIndices; + if (totalShardsRequiredInResponse > maximumAllowedShardCount && requestUsesIndexFilter && isV2Snapshot == false) { + // index-filter is allowed only for a single snapshot, which has to be this one + handleMaximumAllowedShardCountExceeded(request.repository(), totalShardsRequiredInResponse, false); } final Map shardStatus = new HashMap<>(); @@ -563,7 +647,6 @@ private Map snapshotShards( // could not be taken due to partial being set to false. shardSnapshotStatus = IndexShardSnapshotStatus.newFailed("skipped"); } else { - // TODO: to be refactored later if (isV2Snapshot) { shardSnapshotStatus = IndexShardSnapshotStatus.newDone(0, 0, 0, 0, 0, 0, null); } else { @@ -578,21 +661,55 @@ private Map snapshotShards( return unmodifiableMap(shardStatus); } - private void handleIndexNotFound(String index, SnapshotsStatusRequest request, String snapshotName, String repositoryName) { + private void handleIndexNotFound( + Set indicesToProcess, + Set indicesNotFound, + SnapshotsStatusRequest request, + String snapshotName, + boolean moreMissingIndicesPossible + ) throws IndexNotFoundException { + String indices = String.join(", ", indicesNotFound); + if (moreMissingIndicesPossible) { + indices = indices.concat(" and possibly more indices"); + } if (request.ignoreUnavailable()) { - // ignoring unavailable index + // ignoring unavailable indices logger.debug( "snapshot status request ignoring indices [{}], not found in snapshot[{}] in repository [{}]", - index, + indices, snapshotName, - repositoryName + request.repository() ); + + // remove unavailable indices from the set to be processed + indicesToProcess.removeAll(indicesNotFound); } else { - String cause = "indices [" + index + "] missing in snapshot [" + snapshotName + "] of repository [" + repositoryName + "]"; - throw new IndexNotFoundException(index, new IllegalArgumentException(cause)); + String cause = "indices [" + + indices + + "] missing in snapshot [" + + snapshotName + + "] of repository [" + + request.repository() + + "]"; + throw new IndexNotFoundException(indices, new IllegalArgumentException(cause)); } } + private void handleMaximumAllowedShardCountExceeded(String repositoryName, int totalContributingShards, boolean couldInvolveMoreShards) + throws CircuitBreakingException { + String shardCount = "[" + totalContributingShards + (couldInvolveMoreShards ? "+" : "") + "]"; + String message = "[" + + repositoryName + + "] Total shard count " + + shardCount + + " is more than the maximum allowed value of shard count [" + + maximumAllowedShardCount + + "] for snapshot status request. Try narrowing down the request by using a snapshot list or " + + "an index list for a singular snapshot."; + + throw new CircuitBreakingException(message, CircuitBreaker.Durability.PERMANENT); + } + private static SnapshotShardFailure findShardFailure(List shardFailures, ShardId shardId) { for (SnapshotShardFailure shardFailure : shardFailures) { if (shardId.getIndexName().equals(shardFailure.index()) && shardId.getId() == shardFailure.shardId()) { diff --git a/server/src/main/java/org/opensearch/action/resync/ResyncReplicationRequest.java b/server/src/main/java/org/opensearch/action/resync/ResyncReplicationRequest.java index 6a4f2f0607144..a7d6c0abb8705 100644 --- a/server/src/main/java/org/opensearch/action/resync/ResyncReplicationRequest.java +++ b/server/src/main/java/org/opensearch/action/resync/ResyncReplicationRequest.java @@ -103,7 +103,7 @@ public boolean equals(final Object o) { @Override public int hashCode() { - return Objects.hash(trimAboveSeqNo, maxSeenAutoIdTimestampOnPrimary, operations); + return Objects.hash(trimAboveSeqNo, maxSeenAutoIdTimestampOnPrimary, Arrays.hashCode(operations)); } @Override diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/DateDimension.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/DateDimension.java index ee6d5b4680c73..8feb9ccd27dbd 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/DateDimension.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/DateDimension.java @@ -8,6 +8,7 @@ package org.opensearch.index.compositeindex.datacube; +import org.apache.lucene.index.DocValuesType; import org.opensearch.common.Rounding; import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.time.DateUtils; @@ -169,4 +170,8 @@ public int compare(DateTimeUnitRounding unit1, DateTimeUnitRounding unit2) { public static List getSortedDateTimeUnits(List dateTimeUnits) { return dateTimeUnits.stream().sorted(new DateTimeUnitComparator()).collect(Collectors.toList()); } + + public DocValuesType getDocValuesType() { + return DocValuesType.SORTED_NUMERIC; + } } diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/Dimension.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/Dimension.java index cfa8d3a2a8164..3d71b38881693 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/Dimension.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/Dimension.java @@ -8,6 +8,7 @@ package org.opensearch.index.compositeindex.datacube; +import org.apache.lucene.index.DocValuesType; import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.core.xcontent.ToXContent; @@ -42,4 +43,6 @@ public interface Dimension extends ToXContent { * Returns the list of dimension fields that represent the dimension */ List getSubDimensionNames(); + + DocValuesType getDocValuesType(); } diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/NumericDimension.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/NumericDimension.java index acc14f5f05c68..f1d1b15337f4a 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/NumericDimension.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/NumericDimension.java @@ -8,6 +8,7 @@ package org.opensearch.index.compositeindex.datacube; +import org.apache.lucene.index.DocValuesType; import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.index.mapper.CompositeDataCubeFieldType; @@ -71,4 +72,9 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(field); } + + @Override + public DocValuesType getDocValuesType() { + return DocValuesType.SORTED_NUMERIC; + } } diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/ReadDimension.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/ReadDimension.java index be3667f10b6da..0e2ec086abc0a 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/ReadDimension.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/ReadDimension.java @@ -8,6 +8,7 @@ package org.opensearch.index.compositeindex.datacube; +import org.apache.lucene.index.DocValuesType; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.index.mapper.CompositeDataCubeFieldType; @@ -69,4 +70,9 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(field); } + + @Override + public DocValuesType getDocValuesType() { + return DocValuesType.SORTED_NUMERIC; + } } diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeQueryHelper.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeQueryHelper.java new file mode 100644 index 0000000000000..e538be5d5bece --- /dev/null +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeQueryHelper.java @@ -0,0 +1,248 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.compositeindex.datacube.startree.utils; + +import org.apache.lucene.index.DocValuesType; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SegmentReader; +import org.apache.lucene.search.CollectionTerminatedException; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.util.FixedBitSet; +import org.opensearch.common.lucene.Lucene; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; +import org.opensearch.index.codec.composite.CompositeIndexReader; +import org.opensearch.index.compositeindex.datacube.Dimension; +import org.opensearch.index.compositeindex.datacube.Metric; +import org.opensearch.index.compositeindex.datacube.MetricStat; +import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues; +import org.opensearch.index.compositeindex.datacube.startree.utils.iterator.SortedNumericStarTreeValuesIterator; +import org.opensearch.index.mapper.CompositeDataCubeFieldType; +import org.opensearch.index.query.MatchAllQueryBuilder; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.index.query.TermQueryBuilder; +import org.opensearch.search.aggregations.AggregatorFactory; +import org.opensearch.search.aggregations.LeafBucketCollector; +import org.opensearch.search.aggregations.LeafBucketCollectorBase; +import org.opensearch.search.aggregations.metrics.MetricAggregatorFactory; +import org.opensearch.search.aggregations.support.ValuesSource; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.search.internal.SearchContext; +import org.opensearch.search.startree.StarTreeFilter; +import org.opensearch.search.startree.StarTreeQueryContext; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** + * Helper class for building star-tree query + * + * @opensearch.internal + * @opensearch.experimental + */ +public class StarTreeQueryHelper { + + /** + * Checks if the search context can be supported by star-tree + */ + public static boolean isStarTreeSupported(SearchContext context) { + return context.aggregations() != null && context.mapperService().isCompositeIndexPresent() && context.parsedPostFilter() == null; + } + + /** + * Gets StarTreeQueryContext from the search context and source builder. + * Returns null if the query and aggregation cannot be supported. + */ + public static StarTreeQueryContext getStarTreeQueryContext(SearchContext context, SearchSourceBuilder source) throws IOException { + // Current implementation assumes only single star-tree is supported + CompositeDataCubeFieldType compositeMappedFieldType = (CompositeDataCubeFieldType) context.mapperService() + .getCompositeFieldTypes() + .iterator() + .next(); + CompositeIndexFieldInfo starTree = new CompositeIndexFieldInfo( + compositeMappedFieldType.name(), + compositeMappedFieldType.getCompositeIndexType() + ); + + for (AggregatorFactory aggregatorFactory : context.aggregations().factories().getFactories()) { + MetricStat metricStat = validateStarTreeMetricSupport(compositeMappedFieldType, aggregatorFactory); + if (metricStat == null) { + return null; + } + } + + // need to cache star tree values only for multiple aggregations + boolean cacheStarTreeValues = context.aggregations().factories().getFactories().length > 1; + int cacheSize = cacheStarTreeValues ? context.indexShard().segments(false).size() : -1; + + return StarTreeQueryHelper.tryCreateStarTreeQueryContext(starTree, compositeMappedFieldType, source.query(), cacheSize); + } + + /** + * Uses query builder and composite index info to form star-tree query context + */ + private static StarTreeQueryContext tryCreateStarTreeQueryContext( + CompositeIndexFieldInfo compositeIndexFieldInfo, + CompositeDataCubeFieldType compositeFieldType, + QueryBuilder queryBuilder, + int cacheStarTreeValuesSize + ) { + Map queryMap; + if (queryBuilder == null || queryBuilder instanceof MatchAllQueryBuilder) { + queryMap = null; + } else if (queryBuilder instanceof TermQueryBuilder) { + // TODO: Add support for keyword fields + if (compositeFieldType.getDimensions().stream().anyMatch(d -> d.getDocValuesType() != DocValuesType.SORTED_NUMERIC)) { + // return null for non-numeric fields + return null; + } + + List supportedDimensions = compositeFieldType.getDimensions() + .stream() + .map(Dimension::getField) + .collect(Collectors.toList()); + queryMap = getStarTreePredicates(queryBuilder, supportedDimensions); + if (queryMap == null) { + return null; + } + } else { + return null; + } + return new StarTreeQueryContext(compositeIndexFieldInfo, queryMap, cacheStarTreeValuesSize); + } + + /** + * Parse query body to star-tree predicates + * @param queryBuilder to match star-tree supported query shape + * @return predicates to match + */ + private static Map getStarTreePredicates(QueryBuilder queryBuilder, List supportedDimensions) { + TermQueryBuilder tq = (TermQueryBuilder) queryBuilder; + String field = tq.fieldName(); + if (!supportedDimensions.contains(field)) { + return null; + } + long inputQueryVal = Long.parseLong(tq.value().toString()); + + // Create a map with the field and the value + Map predicateMap = new HashMap<>(); + predicateMap.put(field, inputQueryVal); + return predicateMap; + } + + private static MetricStat validateStarTreeMetricSupport( + CompositeDataCubeFieldType compositeIndexFieldInfo, + AggregatorFactory aggregatorFactory + ) { + if (aggregatorFactory instanceof MetricAggregatorFactory && aggregatorFactory.getSubFactories().getFactories().length == 0) { + String field; + Map> supportedMetrics = compositeIndexFieldInfo.getMetrics() + .stream() + .collect(Collectors.toMap(Metric::getField, Metric::getMetrics)); + + MetricStat metricStat = ((MetricAggregatorFactory) aggregatorFactory).getMetricStat(); + field = ((MetricAggregatorFactory) aggregatorFactory).getField(); + + if (supportedMetrics.containsKey(field) && supportedMetrics.get(field).contains(metricStat)) { + return metricStat; + } + } + return null; + } + + public static CompositeIndexFieldInfo getSupportedStarTree(SearchContext context) { + StarTreeQueryContext starTreeQueryContext = context.getStarTreeQueryContext(); + return (starTreeQueryContext != null) ? starTreeQueryContext.getStarTree() : null; + } + + public static StarTreeValues getStarTreeValues(LeafReaderContext context, CompositeIndexFieldInfo starTree) throws IOException { + SegmentReader reader = Lucene.segmentReader(context.reader()); + if (!(reader.getDocValuesReader() instanceof CompositeIndexReader)) { + return null; + } + CompositeIndexReader starTreeDocValuesReader = (CompositeIndexReader) reader.getDocValuesReader(); + return (StarTreeValues) starTreeDocValuesReader.getCompositeIndexValues(starTree); + } + + /** + * Get the star-tree leaf collector + * This collector computes the aggregation prematurely and invokes an early termination collector + */ + public static LeafBucketCollector getStarTreeLeafCollector( + SearchContext context, + ValuesSource.Numeric valuesSource, + LeafReaderContext ctx, + LeafBucketCollector sub, + CompositeIndexFieldInfo starTree, + String metric, + Consumer valueConsumer, + Runnable finalConsumer + ) throws IOException { + StarTreeValues starTreeValues = getStarTreeValues(ctx, starTree); + assert starTreeValues != null; + String fieldName = ((ValuesSource.Numeric.FieldData) valuesSource).getIndexFieldName(); + String metricName = StarTreeUtils.fullyQualifiedFieldNameForStarTreeMetricsDocValues(starTree.getField(), fieldName, metric); + + assert starTreeValues != null; + SortedNumericStarTreeValuesIterator valuesIterator = (SortedNumericStarTreeValuesIterator) starTreeValues.getMetricValuesIterator( + metricName + ); + // Obtain a FixedBitSet of matched star tree document IDs + FixedBitSet filteredValues = getStarTreeFilteredValues(context, ctx, starTreeValues); + assert filteredValues != null; + + int numBits = filteredValues.length(); // Get the number of the filtered values (matching docs) + if (numBits > 0) { + // Iterate over the filtered values + for (int bit = filteredValues.nextSetBit(0); bit != DocIdSetIterator.NO_MORE_DOCS; bit = (bit + 1 < numBits) + ? filteredValues.nextSetBit(bit + 1) + : DocIdSetIterator.NO_MORE_DOCS) { + // Advance to the entryId in the valuesIterator + if (valuesIterator.advanceExact(bit) == false) { + continue; // Skip if no more entries + } + + // Iterate over the values for the current entryId + for (int i = 0, count = valuesIterator.entryValueCount(); i < count; i++) { + long value = valuesIterator.nextValue(); + valueConsumer.accept(value); // Apply the consumer operation (e.g., max, sum) + } + } + } + + // Call the final consumer after processing all entries + finalConsumer.run(); + + // Return a LeafBucketCollector that terminates collection + return new LeafBucketCollectorBase(sub, valuesSource.doubleValues(ctx)) { + @Override + public void collect(int doc, long bucket) { + throw new CollectionTerminatedException(); + } + }; + } + + /** + * Get the filtered values for the star-tree query + * Cache the results in case of multiple aggregations (if cache is initialized) + * @return FixedBitSet of matched document IDs + */ + public static FixedBitSet getStarTreeFilteredValues(SearchContext context, LeafReaderContext ctx, StarTreeValues starTreeValues) + throws IOException { + FixedBitSet result = context.getStarTreeQueryContext().getStarTreeValues(ctx); + if (result == null) { + result = StarTreeFilter.getStarTreeResult(starTreeValues, context.getStarTreeQueryContext().getQueryMap()); + context.getStarTreeQueryContext().setStarTreeValues(ctx, result); + } + return result; + } +} diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/iterator/SortedNumericStarTreeValuesIterator.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/iterator/SortedNumericStarTreeValuesIterator.java index 27afdf1479b4e..4b4bfa6a915eb 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/iterator/SortedNumericStarTreeValuesIterator.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/iterator/SortedNumericStarTreeValuesIterator.java @@ -29,4 +29,12 @@ public SortedNumericStarTreeValuesIterator(DocIdSetIterator docIdSetIterator) { public long nextValue() throws IOException { return ((SortedNumericDocValues) docIdSetIterator).nextValue(); } + + public int entryValueCount() throws IOException { + return ((SortedNumericDocValues) docIdSetIterator).docValueCount(); + } + + public boolean advanceExact(int target) throws IOException { + return ((SortedNumericDocValues) docIdSetIterator).advanceExact(target); + } } diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestSnapshotsStatusAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestSnapshotsStatusAction.java index 154ff5b0aaec1..511115641fb69 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestSnapshotsStatusAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestSnapshotsStatusAction.java @@ -82,6 +82,9 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC snapshots = Strings.EMPTY_ARRAY; } String[] indices = request.paramAsStringArray("index", Strings.EMPTY_ARRAY); + if (indices.length == 1 && "_all".equalsIgnoreCase(indices[0])) { + indices = Strings.EMPTY_ARRAY; + } SnapshotsStatusRequest snapshotsStatusRequest = snapshotsStatusRequest(repository).snapshots(snapshots).indices(indices); snapshotsStatusRequest.ignoreUnavailable(request.paramAsBoolean("ignore_unavailable", snapshotsStatusRequest.ignoreUnavailable())); diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index 49042219b100f..b20f8222d6b7a 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -78,6 +78,7 @@ import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; +import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeQueryHelper; import org.opensearch.index.engine.Engine; import org.opensearch.index.mapper.DerivedFieldResolver; import org.opensearch.index.mapper.DerivedFieldResolverFactory; @@ -138,6 +139,7 @@ import org.opensearch.search.sort.SortAndFormats; import org.opensearch.search.sort.SortBuilder; import org.opensearch.search.sort.SortOrder; +import org.opensearch.search.startree.StarTreeQueryContext; import org.opensearch.search.suggest.Suggest; import org.opensearch.search.suggest.completion.CompletionSuggestion; import org.opensearch.tasks.TaskResourceTrackingService; @@ -165,6 +167,7 @@ import static org.opensearch.common.unit.TimeValue.timeValueHours; import static org.opensearch.common.unit.TimeValue.timeValueMillis; import static org.opensearch.common.unit.TimeValue.timeValueMinutes; +import static org.opensearch.search.internal.SearchContext.TRACK_TOTAL_HITS_DISABLED; /** * The main search service @@ -1359,6 +1362,7 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc context.evaluateRequestShouldUseConcurrentSearch(); return; } + SearchShardTarget shardTarget = context.shardTarget(); QueryShardContext queryShardContext = context.getQueryShardContext(); context.from(source.from()); @@ -1372,7 +1376,7 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc InnerHitContextBuilder.extractInnerHits(source.postFilter(), innerHitBuilders); context.parsedPostFilter(queryShardContext.toQuery(source.postFilter())); } - if (innerHitBuilders.size() > 0) { + if (!innerHitBuilders.isEmpty()) { for (Map.Entry entry : innerHitBuilders.entrySet()) { try { entry.getValue().build(context, context.innerHits()); @@ -1384,9 +1388,7 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc if (source.sorts() != null) { try { Optional optionalSort = SortBuilder.buildSort(source.sorts(), context.getQueryShardContext()); - if (optionalSort.isPresent()) { - context.sort(optionalSort.get()); - } + optionalSort.ifPresent(context::sort); } catch (IOException e) { throw new SearchException(shardTarget, "failed to create sort elements", e); } @@ -1541,6 +1543,20 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc if (source.profile()) { context.setProfilers(new Profilers(context.searcher(), context.shouldUseConcurrentSearch())); } + + if (this.indicesService.getCompositeIndexSettings() != null + && this.indicesService.getCompositeIndexSettings().isStarTreeIndexCreationEnabled() + && StarTreeQueryHelper.isStarTreeSupported(context)) { + try { + StarTreeQueryContext starTreeQueryContext = StarTreeQueryHelper.getStarTreeQueryContext(context, source); + if (starTreeQueryContext != null) { + context.starTreeQueryContext(starTreeQueryContext); + logger.debug("can use star tree"); + } else { + logger.debug("cannot use star tree"); + } + } catch (IOException ignored) {} + } } /** @@ -1700,7 +1716,7 @@ public static boolean canMatchSearchAfter( && minMax != null && primarySortField != null && primarySortField.missing() == null - && Objects.equals(trackTotalHitsUpto, SearchContext.TRACK_TOTAL_HITS_DISABLED)) { + && Objects.equals(trackTotalHitsUpto, TRACK_TOTAL_HITS_DISABLED)) { final Object searchAfterPrimary = searchAfter.fields[0]; if (primarySortField.order() == SortOrder.DESC) { if (minMax.compareMin(searchAfterPrimary) > 0) { diff --git a/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactories.java b/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactories.java index eeb0c606694b0..720a24da1d9d4 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactories.java +++ b/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactories.java @@ -661,4 +661,8 @@ public PipelineTree buildPipelineTree() { return new PipelineTree(subTrees, aggregators); } } + + public AggregatorFactory[] getFactories() { + return factories; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactory.java index 6cc3a78fb1e36..86fbb46a9ad3c 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactory.java @@ -127,4 +127,8 @@ protected boolean supportsConcurrentSegmentSearch() { public boolean evaluateChildFactories() { return factories.allFactoriesSupportConcurrentSearch(); } + + public AggregatorFactories getSubFactories() { + return factories; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java index e58466b56df2a..2970c5ca851e7 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java @@ -32,11 +32,21 @@ package org.opensearch.search.aggregations.metrics; import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.CollectionTerminatedException; +import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.util.FixedBitSet; +import org.apache.lucene.util.NumericUtils; import org.opensearch.common.lease.Releasables; import org.opensearch.common.util.BigArrays; import org.opensearch.common.util.DoubleArray; import org.opensearch.common.util.LongArray; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; +import org.opensearch.index.compositeindex.datacube.MetricStat; +import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues; +import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeQueryHelper; +import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeUtils; +import org.opensearch.index.compositeindex.datacube.startree.utils.iterator.SortedNumericStarTreeValuesIterator; import org.opensearch.index.fielddata.SortedNumericDoubleValues; import org.opensearch.search.DocValueFormat; import org.opensearch.search.aggregations.Aggregator; @@ -50,6 +60,9 @@ import java.io.IOException; import java.util.Map; +import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeQueryHelper.getStarTreeFilteredValues; +import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeQueryHelper.getSupportedStarTree; + /** * Aggregate all docs into an average * @@ -93,6 +106,14 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc if (valuesSource == null) { return LeafBucketCollector.NO_OP_COLLECTOR; } + CompositeIndexFieldInfo supportedStarTree = getSupportedStarTree(this.context); + if (supportedStarTree != null) { + return getStarTreeLeafCollector(ctx, sub, supportedStarTree); + } + return getDefaultLeafCollector(ctx, sub); + } + + private LeafBucketCollector getDefaultLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { final BigArrays bigArrays = context.bigArrays(); final SortedNumericDoubleValues values = valuesSource.doubleValues(ctx); final CompensatedSum kahanSummation = new CompensatedSum(0, 0); @@ -126,6 +147,59 @@ public void collect(int doc, long bucket) throws IOException { }; } + public LeafBucketCollector getStarTreeLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub, CompositeIndexFieldInfo starTree) + throws IOException { + StarTreeValues starTreeValues = StarTreeQueryHelper.getStarTreeValues(ctx, starTree); + assert starTreeValues != null; + + String fieldName = ((ValuesSource.Numeric.FieldData) valuesSource).getIndexFieldName(); + String sumMetricName = StarTreeUtils.fullyQualifiedFieldNameForStarTreeMetricsDocValues( + starTree.getField(), + fieldName, + MetricStat.SUM.getTypeName() + ); + String countMetricName = StarTreeUtils.fullyQualifiedFieldNameForStarTreeMetricsDocValues( + starTree.getField(), + fieldName, + MetricStat.VALUE_COUNT.getTypeName() + ); + + final CompensatedSum kahanSummation = new CompensatedSum(sums.get(0), 0); + SortedNumericStarTreeValuesIterator sumValuesIterator = (SortedNumericStarTreeValuesIterator) starTreeValues + .getMetricValuesIterator(sumMetricName); + SortedNumericStarTreeValuesIterator countValueIterator = (SortedNumericStarTreeValuesIterator) starTreeValues + .getMetricValuesIterator(countMetricName); + FixedBitSet matchedDocIds = getStarTreeFilteredValues(context, ctx, starTreeValues); + assert matchedDocIds != null; + + int numBits = matchedDocIds.length(); // Get the length of the FixedBitSet + if (numBits > 0) { + // Iterate over the FixedBitSet + for (int bit = matchedDocIds.nextSetBit(0); bit != DocIdSetIterator.NO_MORE_DOCS; bit = bit + 1 < numBits + ? matchedDocIds.nextSetBit(bit + 1) + : DocIdSetIterator.NO_MORE_DOCS) { + // Advance to the bit (entryId) in the valuesIterator + if ((sumValuesIterator.advanceExact(bit) && countValueIterator.advanceExact(bit)) == false) { + continue; // Skip if no more entries + } + + // Iterate over the values for the current entryId + for (int i = 0; i < sumValuesIterator.entryValueCount(); i++) { + kahanSummation.add(NumericUtils.sortableLongToDouble(sumValuesIterator.nextValue())); + counts.increment(0, countValueIterator.nextValue()); // Apply the consumer operation (e.g., max, sum) + } + } + } + + sums.set(0, kahanSummation.value()); + return new LeafBucketCollectorBase(sub, valuesSource.doubleValues(ctx)) { + @Override + public void collect(int doc, long bucket) { + throw new CollectionTerminatedException(); + } + }; + } + @Override public double metric(long owningBucketOrd) { if (valuesSource == null || owningBucketOrd >= sums.size()) { diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregatorFactory.java index 0a09fae1eaebe..57389f19b4577 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregatorFactory.java @@ -32,13 +32,13 @@ package org.opensearch.search.aggregations.metrics; +import org.opensearch.index.compositeindex.datacube.MetricStat; import org.opensearch.index.query.QueryShardContext; import org.opensearch.search.aggregations.Aggregator; import org.opensearch.search.aggregations.AggregatorFactories; import org.opensearch.search.aggregations.AggregatorFactory; import org.opensearch.search.aggregations.CardinalityUpperBound; import org.opensearch.search.aggregations.support.CoreValuesSourceType; -import org.opensearch.search.aggregations.support.ValuesSourceAggregatorFactory; import org.opensearch.search.aggregations.support.ValuesSourceConfig; import org.opensearch.search.aggregations.support.ValuesSourceRegistry; import org.opensearch.search.internal.SearchContext; @@ -52,7 +52,7 @@ * * @opensearch.internal */ -class AvgAggregatorFactory extends ValuesSourceAggregatorFactory { +class AvgAggregatorFactory extends MetricAggregatorFactory { AvgAggregatorFactory( String name, @@ -65,6 +65,11 @@ class AvgAggregatorFactory extends ValuesSourceAggregatorFactory { super(name, config, queryShardContext, parent, subFactoriesBuilder, metadata); } + @Override + public MetricStat getMetricStat() { + return MetricStat.AVG; + } + static void registerAggregators(ValuesSourceRegistry.Builder builder) { builder.register( AvgAggregationBuilder.REGISTRY_KEY, diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java index 8108b8a726856..257109bca54bb 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java @@ -37,9 +37,13 @@ import org.apache.lucene.search.CollectionTerminatedException; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.util.Bits; +import org.apache.lucene.util.NumericUtils; import org.opensearch.common.lease.Releasables; import org.opensearch.common.util.BigArrays; import org.opensearch.common.util.DoubleArray; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; +import org.opensearch.index.compositeindex.datacube.MetricStat; +import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeQueryHelper; import org.opensearch.index.fielddata.NumericDoubleValues; import org.opensearch.index.fielddata.SortedNumericDoubleValues; import org.opensearch.search.DocValueFormat; @@ -55,8 +59,11 @@ import java.io.IOException; import java.util.Arrays; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeQueryHelper.getSupportedStarTree; + /** * Aggregate all docs into a max value * @@ -120,6 +127,16 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc throw new CollectionTerminatedException(); } } + + CompositeIndexFieldInfo supportedStarTree = getSupportedStarTree(this.context); + if (supportedStarTree != null) { + return getStarTreeCollector(ctx, sub, supportedStarTree); + } + return getDefaultLeafCollector(ctx, sub); + } + + private LeafBucketCollector getDefaultLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { + final BigArrays bigArrays = context.bigArrays(); final SortedNumericDoubleValues allValues = valuesSource.doubleValues(ctx); final NumericDoubleValues values = MultiValueMode.MAX.select(allValues); @@ -143,6 +160,23 @@ public void collect(int doc, long bucket) throws IOException { }; } + public LeafBucketCollector getStarTreeCollector(LeafReaderContext ctx, LeafBucketCollector sub, CompositeIndexFieldInfo starTree) + throws IOException { + AtomicReference max = new AtomicReference<>(maxes.get(0)); + return StarTreeQueryHelper.getStarTreeLeafCollector( + context, + valuesSource, + ctx, + sub, + starTree, + MetricStat.MAX.getTypeName(), + value -> { + max.set(Math.max(max.get(), (NumericUtils.sortableLongToDouble(value)))); + }, + () -> maxes.set(0, max.get()) + ); + } + @Override public double metric(long owningBucketOrd) { if (valuesSource == null || owningBucketOrd >= maxes.size()) { diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregatorFactory.java index 4fe936c8b7797..c0ee471c87f29 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregatorFactory.java @@ -32,13 +32,13 @@ package org.opensearch.search.aggregations.metrics; +import org.opensearch.index.compositeindex.datacube.MetricStat; import org.opensearch.index.query.QueryShardContext; import org.opensearch.search.aggregations.Aggregator; import org.opensearch.search.aggregations.AggregatorFactories; import org.opensearch.search.aggregations.AggregatorFactory; import org.opensearch.search.aggregations.CardinalityUpperBound; import org.opensearch.search.aggregations.support.CoreValuesSourceType; -import org.opensearch.search.aggregations.support.ValuesSourceAggregatorFactory; import org.opensearch.search.aggregations.support.ValuesSourceConfig; import org.opensearch.search.aggregations.support.ValuesSourceRegistry; import org.opensearch.search.internal.SearchContext; @@ -52,7 +52,7 @@ * * @opensearch.internal */ -class MaxAggregatorFactory extends ValuesSourceAggregatorFactory { +class MaxAggregatorFactory extends MetricAggregatorFactory { static void registerAggregators(ValuesSourceRegistry.Builder builder) { builder.register( @@ -74,6 +74,11 @@ static void registerAggregators(ValuesSourceRegistry.Builder builder) { super(name, config, queryShardContext, parent, subFactoriesBuilder, metadata); } + @Override + public MetricStat getMetricStat() { + return MetricStat.MAX; + } + @Override protected Aggregator createUnmapped(SearchContext searchContext, Aggregator parent, Map metadata) throws IOException { return new MaxAggregator(name, config, searchContext, parent, metadata); diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/MetricAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/MetricAggregatorFactory.java new file mode 100644 index 0000000000000..0ac630cf051d3 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/MetricAggregatorFactory.java @@ -0,0 +1,37 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.aggregations.metrics; + +import org.opensearch.index.compositeindex.datacube.MetricStat; +import org.opensearch.index.query.QueryShardContext; +import org.opensearch.search.aggregations.AggregatorFactories; +import org.opensearch.search.aggregations.AggregatorFactory; +import org.opensearch.search.aggregations.support.ValuesSourceAggregatorFactory; +import org.opensearch.search.aggregations.support.ValuesSourceConfig; + +import java.io.IOException; +import java.util.Map; + +/** + * Extending ValuesSourceAggregatorFactory for aggregation factories supported by star-tree implementation + */ +public abstract class MetricAggregatorFactory extends ValuesSourceAggregatorFactory { + public MetricAggregatorFactory( + String name, + ValuesSourceConfig config, + QueryShardContext queryShardContext, + AggregatorFactory parent, + AggregatorFactories.Builder subFactoriesBuilder, + Map metadata + ) throws IOException { + super(name, config, queryShardContext, parent, subFactoriesBuilder, metadata); + } + + public abstract MetricStat getMetricStat(); +} diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java index 946057e42ac88..a9f20bdeb5fd5 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java @@ -37,9 +37,13 @@ import org.apache.lucene.search.CollectionTerminatedException; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.util.Bits; +import org.apache.lucene.util.NumericUtils; import org.opensearch.common.lease.Releasables; import org.opensearch.common.util.BigArrays; import org.opensearch.common.util.DoubleArray; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; +import org.opensearch.index.compositeindex.datacube.MetricStat; +import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeQueryHelper; import org.opensearch.index.fielddata.NumericDoubleValues; import org.opensearch.index.fielddata.SortedNumericDoubleValues; import org.opensearch.search.DocValueFormat; @@ -54,8 +58,11 @@ import java.io.IOException; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeQueryHelper.getSupportedStarTree; + /** * Aggregate all docs into a min value * @@ -119,6 +126,15 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc throw new CollectionTerminatedException(); } } + + CompositeIndexFieldInfo supportedStarTree = getSupportedStarTree(this.context); + if (supportedStarTree != null) { + return getStarTreeCollector(ctx, sub, supportedStarTree); + } + return getDefaultLeafCollector(ctx, sub); + } + + private LeafBucketCollector getDefaultLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { final BigArrays bigArrays = context.bigArrays(); final SortedNumericDoubleValues allValues = valuesSource.doubleValues(ctx); final NumericDoubleValues values = MultiValueMode.MIN.select(allValues); @@ -138,10 +154,26 @@ public void collect(int doc, long bucket) throws IOException { mins.set(bucket, min); } } - }; } + public LeafBucketCollector getStarTreeCollector(LeafReaderContext ctx, LeafBucketCollector sub, CompositeIndexFieldInfo starTree) + throws IOException { + AtomicReference min = new AtomicReference<>(mins.get(0)); + return StarTreeQueryHelper.getStarTreeLeafCollector( + context, + valuesSource, + ctx, + sub, + starTree, + MetricStat.MIN.getTypeName(), + value -> { + min.set(Math.min(min.get(), (NumericUtils.sortableLongToDouble(value)))); + }, + () -> mins.set(0, min.get()) + ); + } + @Override public double metric(long owningBucketOrd) { if (valuesSource == null || owningBucketOrd >= mins.size()) { diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregatorFactory.java index 58fbe5edefd12..44c0d9d7d11eb 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregatorFactory.java @@ -32,13 +32,13 @@ package org.opensearch.search.aggregations.metrics; +import org.opensearch.index.compositeindex.datacube.MetricStat; import org.opensearch.index.query.QueryShardContext; import org.opensearch.search.aggregations.Aggregator; import org.opensearch.search.aggregations.AggregatorFactories; import org.opensearch.search.aggregations.AggregatorFactory; import org.opensearch.search.aggregations.CardinalityUpperBound; import org.opensearch.search.aggregations.support.CoreValuesSourceType; -import org.opensearch.search.aggregations.support.ValuesSourceAggregatorFactory; import org.opensearch.search.aggregations.support.ValuesSourceConfig; import org.opensearch.search.aggregations.support.ValuesSourceRegistry; import org.opensearch.search.internal.SearchContext; @@ -52,7 +52,7 @@ * * @opensearch.internal */ -class MinAggregatorFactory extends ValuesSourceAggregatorFactory { +class MinAggregatorFactory extends MetricAggregatorFactory { static void registerAggregators(ValuesSourceRegistry.Builder builder) { builder.register( @@ -74,6 +74,11 @@ static void registerAggregators(ValuesSourceRegistry.Builder builder) { super(name, config, queryShardContext, parent, subFactoriesBuilder, metadata); } + @Override + public MetricStat getMetricStat() { + return MetricStat.MIN; + } + @Override protected Aggregator createUnmapped(SearchContext searchContext, Aggregator parent, Map metadata) throws IOException { return new MinAggregator(name, config, searchContext, parent, metadata); diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java index 4b8e882cd69bc..3d237a94c5699 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java @@ -33,9 +33,13 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.util.NumericUtils; import org.opensearch.common.lease.Releasables; import org.opensearch.common.util.BigArrays; import org.opensearch.common.util.DoubleArray; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; +import org.opensearch.index.compositeindex.datacube.MetricStat; +import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeQueryHelper; import org.opensearch.index.fielddata.SortedNumericDoubleValues; import org.opensearch.search.DocValueFormat; import org.opensearch.search.aggregations.Aggregator; @@ -49,6 +53,8 @@ import java.io.IOException; import java.util.Map; +import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeQueryHelper.getSupportedStarTree; + /** * Aggregate all docs into a single sum value * @@ -89,6 +95,15 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc if (valuesSource == null) { return LeafBucketCollector.NO_OP_COLLECTOR; } + + CompositeIndexFieldInfo supportedStarTree = getSupportedStarTree(this.context); + if (supportedStarTree != null) { + return getStarTreeCollector(ctx, sub, supportedStarTree); + } + return getDefaultLeafCollector(ctx, sub); + } + + private LeafBucketCollector getDefaultLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { final BigArrays bigArrays = context.bigArrays(); final SortedNumericDoubleValues values = valuesSource.doubleValues(ctx); final CompensatedSum kahanSummation = new CompensatedSum(0, 0); @@ -118,6 +133,21 @@ public void collect(int doc, long bucket) throws IOException { }; } + public LeafBucketCollector getStarTreeCollector(LeafReaderContext ctx, LeafBucketCollector sub, CompositeIndexFieldInfo starTree) + throws IOException { + final CompensatedSum kahanSummation = new CompensatedSum(sums.get(0), 0); + return StarTreeQueryHelper.getStarTreeLeafCollector( + context, + valuesSource, + ctx, + sub, + starTree, + MetricStat.SUM.getTypeName(), + value -> kahanSummation.add(NumericUtils.sortableLongToDouble(value)), + () -> sums.set(0, kahanSummation.value()) + ); + } + @Override public double metric(long owningBucketOrd) { if (valuesSource == null || owningBucketOrd >= sums.size()) { diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregatorFactory.java index ef9b93920ba18..e2e25a8c25a87 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregatorFactory.java @@ -32,13 +32,13 @@ package org.opensearch.search.aggregations.metrics; +import org.opensearch.index.compositeindex.datacube.MetricStat; import org.opensearch.index.query.QueryShardContext; import org.opensearch.search.aggregations.Aggregator; import org.opensearch.search.aggregations.AggregatorFactories; import org.opensearch.search.aggregations.AggregatorFactory; import org.opensearch.search.aggregations.CardinalityUpperBound; import org.opensearch.search.aggregations.support.CoreValuesSourceType; -import org.opensearch.search.aggregations.support.ValuesSourceAggregatorFactory; import org.opensearch.search.aggregations.support.ValuesSourceConfig; import org.opensearch.search.aggregations.support.ValuesSourceRegistry; import org.opensearch.search.internal.SearchContext; @@ -52,7 +52,7 @@ * * @opensearch.internal */ -class SumAggregatorFactory extends ValuesSourceAggregatorFactory { +class SumAggregatorFactory extends MetricAggregatorFactory { SumAggregatorFactory( String name, @@ -65,6 +65,11 @@ class SumAggregatorFactory extends ValuesSourceAggregatorFactory { super(name, config, queryShardContext, parent, subFactoriesBuilder, metadata); } + @Override + public MetricStat getMetricStat() { + return MetricStat.SUM; + } + static void registerAggregators(ValuesSourceRegistry.Builder builder) { builder.register( SumAggregationBuilder.REGISTRY_KEY, diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/ValueCountAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/ValueCountAggregator.java index 6f9be06231819..a156ec49983fa 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/ValueCountAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/ValueCountAggregator.java @@ -37,6 +37,9 @@ import org.opensearch.common.lease.Releasables; import org.opensearch.common.util.BigArrays; import org.opensearch.common.util.LongArray; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; +import org.opensearch.index.compositeindex.datacube.MetricStat; +import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeQueryHelper; import org.opensearch.index.fielddata.MultiGeoPointValues; import org.opensearch.index.fielddata.SortedBinaryDocValues; import org.opensearch.search.aggregations.Aggregator; @@ -50,6 +53,8 @@ import java.io.IOException; import java.util.Map; +import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeQueryHelper.getSupportedStarTree; + /** * A field data based aggregator that counts the number of values a specific field has within the aggregation context. *

@@ -88,6 +93,12 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc final BigArrays bigArrays = context.bigArrays(); if (valuesSource instanceof ValuesSource.Numeric) { + + CompositeIndexFieldInfo supportedStarTree = getSupportedStarTree(this.context); + if (supportedStarTree != null) { + return getStarTreeCollector(ctx, sub, supportedStarTree); + } + final SortedNumericDocValues values = ((ValuesSource.Numeric) valuesSource).longValues(ctx); return new LeafBucketCollectorBase(sub, values) { @@ -124,10 +135,23 @@ public void collect(int doc, long bucket) throws IOException { counts.increment(bucket, values.docValueCount()); } } - }; } + public LeafBucketCollector getStarTreeCollector(LeafReaderContext ctx, LeafBucketCollector sub, CompositeIndexFieldInfo starTree) + throws IOException { + return StarTreeQueryHelper.getStarTreeLeafCollector( + context, + (ValuesSource.Numeric) valuesSource, + ctx, + sub, + starTree, + MetricStat.VALUE_COUNT.getTypeName(), + value -> counts.increment(0, value), + () -> {} + ); + } + @Override public double metric(long owningBucketOrd) { return (valuesSource == null || owningBucketOrd >= counts.size()) ? 0 : counts.get(owningBucketOrd); diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/ValueCountAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/ValueCountAggregatorFactory.java index 4a04dd2e0a932..0c82279484461 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/ValueCountAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/ValueCountAggregatorFactory.java @@ -32,13 +32,13 @@ package org.opensearch.search.aggregations.metrics; +import org.opensearch.index.compositeindex.datacube.MetricStat; import org.opensearch.index.query.QueryShardContext; import org.opensearch.search.aggregations.Aggregator; import org.opensearch.search.aggregations.AggregatorFactories; import org.opensearch.search.aggregations.AggregatorFactory; import org.opensearch.search.aggregations.CardinalityUpperBound; import org.opensearch.search.aggregations.support.CoreValuesSourceType; -import org.opensearch.search.aggregations.support.ValuesSourceAggregatorFactory; import org.opensearch.search.aggregations.support.ValuesSourceConfig; import org.opensearch.search.aggregations.support.ValuesSourceRegistry; import org.opensearch.search.internal.SearchContext; @@ -51,7 +51,7 @@ * * @opensearch.internal */ -class ValueCountAggregatorFactory extends ValuesSourceAggregatorFactory { +class ValueCountAggregatorFactory extends MetricAggregatorFactory { public static void registerAggregators(ValuesSourceRegistry.Builder builder) { builder.register(ValueCountAggregationBuilder.REGISTRY_KEY, CoreValuesSourceType.ALL_CORE, ValueCountAggregator::new, true); @@ -68,6 +68,11 @@ public static void registerAggregators(ValuesSourceRegistry.Builder builder) { super(name, config, queryShardContext, parent, subFactoriesBuilder, metadata); } + @Override + public MetricStat getMetricStat() { + return MetricStat.VALUE_COUNT; + } + @Override protected Aggregator createUnmapped(SearchContext searchContext, Aggregator parent, Map metadata) throws IOException { return new ValueCountAggregator(name, config, searchContext, parent, metadata); diff --git a/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSource.java b/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSource.java index 1f4dd429e094e..5732d545cb2d2 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSource.java +++ b/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSource.java @@ -625,6 +625,10 @@ public SortedNumericDocValues longValues(LeafReaderContext context) { public SortedNumericDoubleValues doubleValues(LeafReaderContext context) { return indexFieldData.load(context).getDoubleValues(); } + + public String getIndexFieldName() { + return indexFieldData.getFieldName(); + } } /** diff --git a/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSourceAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSourceAggregatorFactory.java index 69a4a5d8b6703..d862b2c2784de 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSourceAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSourceAggregatorFactory.java @@ -102,4 +102,8 @@ protected abstract Aggregator doCreateInternal( public String getStatsSubtype() { return config.valueSourceType().typeName(); } + + public String getField() { + return config.fieldContext().field(); + } } diff --git a/server/src/main/java/org/opensearch/search/internal/SearchContext.java b/server/src/main/java/org/opensearch/search/internal/SearchContext.java index 4b1b720a1aed7..641efdec015eb 100644 --- a/server/src/main/java/org/opensearch/search/internal/SearchContext.java +++ b/server/src/main/java/org/opensearch/search/internal/SearchContext.java @@ -76,6 +76,7 @@ import org.opensearch.search.query.ReduceableSearchResult; import org.opensearch.search.rescore.RescoreContext; import org.opensearch.search.sort.SortAndFormats; +import org.opensearch.search.startree.StarTreeQueryContext; import org.opensearch.search.suggest.SuggestionSearchContext; import java.util.Collection; @@ -124,8 +125,8 @@ public List toInternalAggregations(Collection co private final List releasables = new CopyOnWriteArrayList<>(); private final AtomicBoolean closed = new AtomicBoolean(false); private InnerHitsContext innerHitsContext; - private volatile boolean searchTimedOut; + private StarTreeQueryContext starTreeQueryContext; protected SearchContext() {} @@ -535,4 +536,12 @@ public boolean keywordIndexOrDocValuesEnabled() { return false; } + public SearchContext starTreeQueryContext(StarTreeQueryContext starTreeQueryContext) { + this.starTreeQueryContext = starTreeQueryContext; + return this; + } + + public StarTreeQueryContext getStarTreeQueryContext() { + return this.starTreeQueryContext; + } } diff --git a/server/src/main/java/org/opensearch/search/startree/StarTreeFilter.java b/server/src/main/java/org/opensearch/search/startree/StarTreeFilter.java new file mode 100644 index 0000000000000..f7fa210691678 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/startree/StarTreeFilter.java @@ -0,0 +1,228 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.startree; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.util.DocIdSetBuilder; +import org.apache.lucene.util.FixedBitSet; +import org.opensearch.index.compositeindex.datacube.Dimension; +import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues; +import org.opensearch.index.compositeindex.datacube.startree.node.StarTreeNode; +import org.opensearch.index.compositeindex.datacube.startree.node.StarTreeNodeType; +import org.opensearch.index.compositeindex.datacube.startree.utils.iterator.SortedNumericStarTreeValuesIterator; +import org.opensearch.index.compositeindex.datacube.startree.utils.iterator.StarTreeValuesIterator; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; + +/** + * Filter operator for star tree data structure. + * + * @opensearch.experimental + * @opensearch.internal + */ +public class StarTreeFilter { + private static final Logger logger = LogManager.getLogger(StarTreeFilter.class); + + /** + * First go over the star tree and try to match as many dimensions as possible + * For the remaining columns, use star-tree doc values to match them + */ + public static FixedBitSet getStarTreeResult(StarTreeValues starTreeValues, Map predicateEvaluators) throws IOException { + Map queryMap = predicateEvaluators != null ? predicateEvaluators : Collections.emptyMap(); + StarTreeResult starTreeResult = traverseStarTree(starTreeValues, queryMap); + + // Initialize FixedBitSet with size maxMatchedDoc + 1 + FixedBitSet bitSet = new FixedBitSet(starTreeResult.maxMatchedDoc + 1); + SortedNumericStarTreeValuesIterator starTreeValuesIterator = new SortedNumericStarTreeValuesIterator( + starTreeResult.matchedDocIds.build().iterator() + ); + + // No matches, return an empty FixedBitSet + if (starTreeResult.maxMatchedDoc == -1) { + return bitSet; + } + + // Set bits in FixedBitSet for initially matched documents + while (starTreeValuesIterator.nextEntry() != NO_MORE_DOCS) { + bitSet.set(starTreeValuesIterator.entryId()); + } + + // Temporary FixedBitSet reused for filtering + FixedBitSet tempBitSet = new FixedBitSet(starTreeResult.maxMatchedDoc + 1); + + // Process remaining predicate columns to further filter the results + for (String remainingPredicateColumn : starTreeResult.remainingPredicateColumns) { + logger.debug("remainingPredicateColumn : {}, maxMatchedDoc : {} ", remainingPredicateColumn, starTreeResult.maxMatchedDoc); + + SortedNumericStarTreeValuesIterator ndv = (SortedNumericStarTreeValuesIterator) starTreeValues.getDimensionValuesIterator( + remainingPredicateColumn + ); + + long queryValue = queryMap.get(remainingPredicateColumn); // Get the query value directly + + // Clear the temporary bit set before reuse + tempBitSet.clear(0, starTreeResult.maxMatchedDoc + 1); + + if (bitSet.length() > 0) { + // Iterate over the current set of matched document IDs + for (int entryId = bitSet.nextSetBit(0); entryId != DocIdSetIterator.NO_MORE_DOCS; entryId = (entryId + 1 < bitSet.length()) + ? bitSet.nextSetBit(entryId + 1) + : DocIdSetIterator.NO_MORE_DOCS) { + if (ndv.advance(entryId) != StarTreeValuesIterator.NO_MORE_ENTRIES) { + final int valuesCount = ndv.entryValueCount(); + for (int i = 0; i < valuesCount; i++) { + long value = ndv.nextValue(); + // Compare the value with the query value + if (value == queryValue) { + tempBitSet.set(entryId); // Set bit for the matching entryId + break; // No need to check other values for this entryId + } + } + } + } + } + + // Perform intersection of the current matches with the temp results for this predicate + bitSet.and(tempBitSet); + } + + return bitSet; // Return the final FixedBitSet with all matches + } + + /** + * Helper method to traverse the star tree, get matching documents and keep track of all the + * predicate dimensions that are not matched. + */ + private static StarTreeResult traverseStarTree(StarTreeValues starTreeValues, Map queryMap) throws IOException { + DocIdSetBuilder docsWithField = new DocIdSetBuilder(starTreeValues.getStarTreeDocumentCount()); + DocIdSetBuilder.BulkAdder adder; + Set globalRemainingPredicateColumns = null; + StarTreeNode starTree = starTreeValues.getRoot(); + List dimensionNames = starTreeValues.getStarTreeField() + .getDimensionsOrder() + .stream() + .map(Dimension::getField) + .collect(Collectors.toList()); + boolean foundLeafNode = starTree.isLeaf(); + assert foundLeafNode == false; // root node is never leaf + Queue queue = new ArrayDeque<>(); + queue.add(starTree); + int currentDimensionId = -1; + Set remainingPredicateColumns = new HashSet<>(queryMap.keySet()); + int matchedDocsCountInStarTree = 0; + int maxDocNum = -1; + StarTreeNode starTreeNode; + List docIds = new ArrayList<>(); + + while ((starTreeNode = queue.poll()) != null) { + int dimensionId = starTreeNode.getDimensionId(); + if (dimensionId > currentDimensionId) { + String dimension = dimensionNames.get(dimensionId); + remainingPredicateColumns.remove(dimension); + if (foundLeafNode && globalRemainingPredicateColumns == null) { + globalRemainingPredicateColumns = new HashSet<>(remainingPredicateColumns); + } + currentDimensionId = dimensionId; + } + + if (remainingPredicateColumns.isEmpty()) { + int docId = starTreeNode.getAggregatedDocId(); + docIds.add(docId); + matchedDocsCountInStarTree++; + maxDocNum = Math.max(docId, maxDocNum); + continue; + } + + if (starTreeNode.isLeaf()) { + for (long i = starTreeNode.getStartDocId(); i < starTreeNode.getEndDocId(); i++) { + docIds.add((int) i); + matchedDocsCountInStarTree++; + maxDocNum = Math.max((int) i, maxDocNum); + } + continue; + } + + String childDimension = dimensionNames.get(dimensionId + 1); + StarTreeNode starNode = null; + if (globalRemainingPredicateColumns == null || !globalRemainingPredicateColumns.contains(childDimension)) { + starNode = starTreeNode.getChildStarNode(); + } + + if (remainingPredicateColumns.contains(childDimension)) { + long queryValue = queryMap.get(childDimension); // Get the query value directly from the map + StarTreeNode matchingChild = starTreeNode.getChildForDimensionValue(queryValue); + if (matchingChild != null) { + queue.add(matchingChild); + foundLeafNode |= matchingChild.isLeaf(); + } + } else { + if (starNode != null) { + queue.add(starNode); + foundLeafNode |= starNode.isLeaf(); + } else { + Iterator childrenIterator = starTreeNode.getChildrenIterator(); + while (childrenIterator.hasNext()) { + StarTreeNode childNode = childrenIterator.next(); + if (childNode.getStarTreeNodeType() != StarTreeNodeType.STAR.getValue()) { + queue.add(childNode); + foundLeafNode |= childNode.isLeaf(); + } + } + } + } + } + + adder = docsWithField.grow(docIds.size()); + for (int id : docIds) { + adder.add(id); + } + return new StarTreeResult( + docsWithField, + globalRemainingPredicateColumns != null ? globalRemainingPredicateColumns : Collections.emptySet(), + matchedDocsCountInStarTree, + maxDocNum + ); + } + + /** + * Helper class to wrap the result from traversing the star tree. + * */ + private static class StarTreeResult { + public final DocIdSetBuilder matchedDocIds; + public final Set remainingPredicateColumns; + public final int numOfMatchedDocs; + public final int maxMatchedDoc; + + public StarTreeResult( + DocIdSetBuilder matchedDocIds, + Set remainingPredicateColumns, + int numOfMatchedDocs, + int maxMatchedDoc + ) { + this.matchedDocIds = matchedDocIds; + this.remainingPredicateColumns = remainingPredicateColumns; + this.numOfMatchedDocs = numOfMatchedDocs; + this.maxMatchedDoc = maxMatchedDoc; + } + } +} diff --git a/server/src/main/java/org/opensearch/search/startree/StarTreeQueryContext.java b/server/src/main/java/org/opensearch/search/startree/StarTreeQueryContext.java new file mode 100644 index 0000000000000..cda3a25b30e53 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/startree/StarTreeQueryContext.java @@ -0,0 +1,79 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.startree; + +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.util.FixedBitSet; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; + +import java.util.Map; + +/** + * Query class for querying star tree data structure. + * + * @opensearch.experimental + */ +@ExperimentalApi +public class StarTreeQueryContext { + + /** + * Star tree field info + * This is used to get the star tree data structure + */ + private final CompositeIndexFieldInfo starTree; + + /** + * Map of field name to a value to be queried for that field + * This is used to filter the data based on the query + */ + private final Map queryMap; + + /** + * Cache for leaf results + * This is used to cache the results for each leaf reader context + * to avoid reading the filtered values from the leaf reader context multiple times + */ + private final FixedBitSet[] starTreeValues; + + public StarTreeQueryContext(CompositeIndexFieldInfo starTree, Map queryMap, int numSegmentsCache) { + this.starTree = starTree; + this.queryMap = queryMap; + if (numSegmentsCache > -1) { + starTreeValues = new FixedBitSet[numSegmentsCache]; + } else { + starTreeValues = null; + } + } + + public CompositeIndexFieldInfo getStarTree() { + return starTree; + } + + public Map getQueryMap() { + return queryMap; + } + + public FixedBitSet[] getStarTreeValues() { + return starTreeValues; + } + + public FixedBitSet getStarTreeValues(LeafReaderContext ctx) { + if (starTreeValues != null) { + return starTreeValues[ctx.ord]; + } + return null; + } + + public void setStarTreeValues(LeafReaderContext ctx, FixedBitSet values) { + if (starTreeValues != null) { + starTreeValues[ctx.ord] = values; + } + } +} diff --git a/server/src/main/java/org/opensearch/search/startree/package-info.java b/server/src/main/java/org/opensearch/search/startree/package-info.java new file mode 100644 index 0000000000000..601a588e54e69 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/startree/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Star Tree query classes */ +package org.opensearch.search.startree; diff --git a/server/src/test/java/org/opensearch/action/resync/ResyncReplicationRequestTests.java b/server/src/test/java/org/opensearch/action/resync/ResyncReplicationRequestTests.java index 654dbb203b38a..9faaafc22c844 100644 --- a/server/src/test/java/org/opensearch/action/resync/ResyncReplicationRequestTests.java +++ b/server/src/test/java/org/opensearch/action/resync/ResyncReplicationRequestTests.java @@ -40,14 +40,14 @@ import org.opensearch.test.OpenSearchTestCase; import java.io.IOException; -import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import static org.hamcrest.Matchers.equalTo; public class ResyncReplicationRequestTests extends OpenSearchTestCase { public void testSerialization() throws IOException { - final byte[] bytes = "{}".getBytes(Charset.forName("UTF-8")); + final byte[] bytes = "{}".getBytes(StandardCharsets.UTF_8); final Translog.Index index = new Translog.Index("id", 0, randomNonNegativeLong(), randomNonNegativeLong(), bytes, null, -1); final ShardId shardId = new ShardId(new Index("index", "uuid"), 0); final ResyncReplicationRequest before = new ResyncReplicationRequest(shardId, 42L, 100, new Translog.Operation[] { index }); @@ -61,4 +61,18 @@ public void testSerialization() throws IOException { assertThat(after, equalTo(before)); } + public void testContractBetweenEqualsAndHashCode() { + final byte[] bytes = "{}".getBytes(StandardCharsets.UTF_8); + final Translog.Index index = new Translog.Index("id", 0, 123L, -123L, bytes, null, -1); + final ShardId shardId = new ShardId(new Index("index", "uuid"), 0); + // Both created requests have arrays `operations` with the same content, and we want to verify that + // equals() and hashCode() are following the contract: + // If objects are equal, they have the same hash code + final ResyncReplicationRequest request1 = new ResyncReplicationRequest(shardId, 42L, 100, new Translog.Operation[] { index }); + final ResyncReplicationRequest request2 = new ResyncReplicationRequest(shardId, 42L, 100, new Translog.Operation[] { index }); + + assertEquals(request1, request2); + assertEquals(request1.hashCode(), request2.hashCode()); + } + } diff --git a/server/src/test/java/org/opensearch/index/codec/composite912/datacube/startree/StarTreeDocValuesFormatTests.java b/server/src/test/java/org/opensearch/index/codec/composite912/datacube/startree/StarTreeDocValuesFormatTests.java index d35fc6b111c9f..f081cadc1362c 100644 --- a/server/src/test/java/org/opensearch/index/codec/composite912/datacube/startree/StarTreeDocValuesFormatTests.java +++ b/server/src/test/java/org/opensearch/index/codec/composite912/datacube/startree/StarTreeDocValuesFormatTests.java @@ -109,7 +109,7 @@ protected Codec getCodec() { final Logger testLogger = LogManager.getLogger(StarTreeDocValuesFormatTests.class); try { - createMapperService(getExpandedMapping()); + mapperService = createMapperService(getExpandedMapping()); } catch (IOException e) { throw new RuntimeException(e); } @@ -307,7 +307,7 @@ public void testStarTreeDocValuesWithDeletions() throws IOException { directory.close(); } - private XContentBuilder getExpandedMapping() throws IOException { + public static XContentBuilder getExpandedMapping() throws IOException { return topMapping(b -> { b.startObject("composite"); b.startObject("startree"); @@ -361,13 +361,13 @@ private XContentBuilder getExpandedMapping() throws IOException { }); } - private XContentBuilder topMapping(CheckedConsumer buildFields) throws IOException { + public static XContentBuilder topMapping(CheckedConsumer buildFields) throws IOException { XContentBuilder builder = XContentFactory.jsonBuilder().startObject().startObject("_doc"); buildFields.accept(builder); return builder.endObject().endObject(); } - private void createMapperService(XContentBuilder builder) throws IOException { + public static MapperService createMapperService(XContentBuilder builder) throws IOException { Settings settings = Settings.builder() .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) @@ -377,7 +377,7 @@ private void createMapperService(XContentBuilder builder) throws IOException { .build(); IndexMetadata indexMetadata = IndexMetadata.builder("test").settings(settings).putMapping(builder.toString()).build(); IndicesModule indicesModule = new IndicesModule(Collections.emptyList()); - mapperService = MapperTestUtils.newMapperServiceWithHelperAnalyzer( + MapperService mapperService = MapperTestUtils.newMapperServiceWithHelperAnalyzer( new NamedXContentRegistry(ClusterModule.getNamedXWriteables()), createTempDir(), settings, @@ -385,5 +385,6 @@ private void createMapperService(XContentBuilder builder) throws IOException { "test" ); mapperService.merge(indexMetadata, MapperService.MergeReason.INDEX_TEMPLATE); + return mapperService; } } diff --git a/server/src/test/java/org/opensearch/search/SearchServiceStarTreeTests.java b/server/src/test/java/org/opensearch/search/SearchServiceStarTreeTests.java new file mode 100644 index 0000000000000..0c88154ca2b38 --- /dev/null +++ b/server/src/test/java/org/opensearch/search/SearchServiceStarTreeTests.java @@ -0,0 +1,160 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search; + +import org.opensearch.action.OriginalIndices; +import org.opensearch.action.admin.indices.create.CreateIndexRequestBuilder; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.core.common.Strings; +import org.opensearch.index.IndexService; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; +import org.opensearch.index.codec.composite912.datacube.startree.StarTreeDocValuesFormatTests; +import org.opensearch.index.compositeindex.CompositeIndexSettings; +import org.opensearch.index.compositeindex.datacube.startree.StarTreeIndexSettings; +import org.opensearch.index.mapper.CompositeMappedFieldType; +import org.opensearch.index.query.MatchAllQueryBuilder; +import org.opensearch.index.query.TermQueryBuilder; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.indices.IndicesService; +import org.opensearch.search.aggregations.AggregationBuilders; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.search.internal.AliasFilter; +import org.opensearch.search.internal.ReaderContext; +import org.opensearch.search.internal.SearchContext; +import org.opensearch.search.internal.ShardSearchRequest; +import org.opensearch.search.startree.StarTreeQueryContext; +import org.opensearch.test.OpenSearchSingleNodeTestCase; + +import java.io.IOException; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; + +public class SearchServiceStarTreeTests extends OpenSearchSingleNodeTestCase { + + public void testParseQueryToOriginalOrStarTreeQuery() throws IOException { + FeatureFlags.initializeFeatureFlags(Settings.builder().put(FeatureFlags.STAR_TREE_INDEX, true).build()); + setStarTreeIndexSetting("true"); + + Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(StarTreeIndexSettings.IS_COMPOSITE_INDEX_SETTING.getKey(), true) + .build(); + CreateIndexRequestBuilder builder = client().admin() + .indices() + .prepareCreate("test") + .setSettings(settings) + .setMapping(StarTreeDocValuesFormatTests.getExpandedMapping()); + createIndex("test", builder); + + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService indexService = indicesService.indexServiceSafe(resolveIndex("test")); + IndexShard indexShard = indexService.getShard(0); + ShardSearchRequest request = new ShardSearchRequest( + OriginalIndices.NONE, + new SearchRequest().allowPartialSearchResults(true), + indexShard.shardId(), + 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), + 1.0f, + -1, + null, + null + ); + + // Case 1: No query or aggregations, should not use star tree + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); + assertStarTreeContext(request, sourceBuilder, null, -1); + + // Case 2: MatchAllQuery present but no aggregations, should not use star tree + sourceBuilder = new SearchSourceBuilder().query(new MatchAllQueryBuilder()); + assertStarTreeContext(request, sourceBuilder, null, -1); + + // Case 3: MatchAllQuery and aggregations present, should use star tree + sourceBuilder = new SearchSourceBuilder().size(0) + .query(new MatchAllQueryBuilder()) + .aggregation(AggregationBuilders.max("test").field("field")); + CompositeIndexFieldInfo expectedStarTree = new CompositeIndexFieldInfo( + "startree", + CompositeMappedFieldType.CompositeFieldType.STAR_TREE + ); + Map expectedQueryMap = null; + assertStarTreeContext(request, sourceBuilder, new StarTreeQueryContext(expectedStarTree, expectedQueryMap, -1), -1); + + // Case 4: MatchAllQuery and aggregations present, but postFilter specified, should not use star tree + sourceBuilder = new SearchSourceBuilder().size(0) + .query(new MatchAllQueryBuilder()) + .aggregation(AggregationBuilders.max("test").field("field")) + .postFilter(new MatchAllQueryBuilder()); + assertStarTreeContext(request, sourceBuilder, null, -1); + + // Case 5: TermQuery and single aggregation, should use star tree, but not initialize query cache + sourceBuilder = new SearchSourceBuilder().size(0) + .query(new TermQueryBuilder("sndv", 1)) + .aggregation(AggregationBuilders.max("test").field("field")); + expectedQueryMap = Map.of("sndv", 1L); + assertStarTreeContext(request, sourceBuilder, new StarTreeQueryContext(expectedStarTree, expectedQueryMap, -1), -1); + + // Case 6: TermQuery and multiple aggregations present, should use star tree & initialize cache + sourceBuilder = new SearchSourceBuilder().size(0) + .query(new TermQueryBuilder("sndv", 1)) + .aggregation(AggregationBuilders.max("test").field("field")) + .aggregation(AggregationBuilders.sum("test2").field("field")); + expectedQueryMap = Map.of("sndv", 1L); + assertStarTreeContext(request, sourceBuilder, new StarTreeQueryContext(expectedStarTree, expectedQueryMap, 0), 0); + + // Case 7: No query, metric aggregations present, should use star tree + sourceBuilder = new SearchSourceBuilder().size(0).aggregation(AggregationBuilders.max("test").field("field")); + assertStarTreeContext(request, sourceBuilder, new StarTreeQueryContext(expectedStarTree, null, -1), -1); + + setStarTreeIndexSetting(null); + } + + private void setStarTreeIndexSetting(String value) throws IOException { + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().put(CompositeIndexSettings.STAR_TREE_INDEX_ENABLED_SETTING.getKey(), value).build()) + .execute(); + } + + private void assertStarTreeContext( + ShardSearchRequest request, + SearchSourceBuilder sourceBuilder, + StarTreeQueryContext expectedContext, + int expectedCacheUsage + ) throws IOException { + request.source(sourceBuilder); + SearchService searchService = getInstanceFromNode(SearchService.class); + try (ReaderContext reader = searchService.createOrGetReaderContext(request, false)) { + SearchContext context = searchService.createContext(reader, request, null, true); + StarTreeQueryContext actualContext = context.getStarTreeQueryContext(); + + if (expectedContext == null) { + assertThat(context.getStarTreeQueryContext(), nullValue()); + } else { + assertThat(actualContext, notNullValue()); + assertEquals(expectedContext.getStarTree().getType(), actualContext.getStarTree().getType()); + assertEquals(expectedContext.getStarTree().getField(), actualContext.getStarTree().getField()); + assertEquals(expectedContext.getQueryMap(), actualContext.getQueryMap()); + if (expectedCacheUsage > -1) { + assertEquals(expectedCacheUsage, actualContext.getStarTreeValues().length); + } else { + assertNull(actualContext.getStarTreeValues()); + } + } + searchService.doStop(); + } + } +} diff --git a/server/src/test/java/org/opensearch/search/aggregations/startree/MetricAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/startree/MetricAggregatorTests.java new file mode 100644 index 0000000000000..0327bd9990784 --- /dev/null +++ b/server/src/test/java/org/opensearch/search/aggregations/startree/MetricAggregatorTests.java @@ -0,0 +1,317 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.aggregations.startree; + +import com.carrotsearch.randomizedtesting.RandomizedTest; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.codecs.Codec; +import org.apache.lucene.codecs.lucene912.Lucene912Codec; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.SortedNumericDocValuesField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SegmentReader; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.store.Directory; +import org.apache.lucene.tests.index.RandomIndexWriter; +import org.opensearch.common.lucene.Lucene; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; +import org.opensearch.index.codec.composite.CompositeIndexReader; +import org.opensearch.index.codec.composite.composite912.Composite912Codec; +import org.opensearch.index.codec.composite912.datacube.startree.StarTreeDocValuesFormatTests; +import org.opensearch.index.compositeindex.datacube.Dimension; +import org.opensearch.index.compositeindex.datacube.NumericDimension; +import org.opensearch.index.mapper.MappedFieldType; +import org.opensearch.index.mapper.MapperService; +import org.opensearch.index.mapper.NumberFieldMapper; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.index.query.TermQueryBuilder; +import org.opensearch.search.aggregations.AggregationBuilder; +import org.opensearch.search.aggregations.AggregatorTestCase; +import org.opensearch.search.aggregations.InternalAggregation; +import org.opensearch.search.aggregations.metrics.AvgAggregationBuilder; +import org.opensearch.search.aggregations.metrics.InternalAvg; +import org.opensearch.search.aggregations.metrics.InternalMax; +import org.opensearch.search.aggregations.metrics.InternalMin; +import org.opensearch.search.aggregations.metrics.InternalSum; +import org.opensearch.search.aggregations.metrics.InternalValueCount; +import org.opensearch.search.aggregations.metrics.MaxAggregationBuilder; +import org.opensearch.search.aggregations.metrics.MinAggregationBuilder; +import org.opensearch.search.aggregations.metrics.SumAggregationBuilder; +import org.opensearch.search.aggregations.metrics.ValueCountAggregationBuilder; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Random; +import java.util.function.BiConsumer; +import java.util.function.Function; + +import static org.opensearch.search.aggregations.AggregationBuilders.avg; +import static org.opensearch.search.aggregations.AggregationBuilders.count; +import static org.opensearch.search.aggregations.AggregationBuilders.max; +import static org.opensearch.search.aggregations.AggregationBuilders.min; +import static org.opensearch.search.aggregations.AggregationBuilders.sum; +import static org.opensearch.test.InternalAggregationTestCase.DEFAULT_MAX_BUCKETS; + +public class MetricAggregatorTests extends AggregatorTestCase { + + private static final String FIELD_NAME = "field"; + private static final NumberFieldMapper.NumberType DEFAULT_FIELD_TYPE = NumberFieldMapper.NumberType.LONG; + private static final MappedFieldType DEFAULT_MAPPED_FIELD = new NumberFieldMapper.NumberFieldType(FIELD_NAME, DEFAULT_FIELD_TYPE); + + @Before + public void setup() { + FeatureFlags.initializeFeatureFlags(Settings.builder().put(FeatureFlags.STAR_TREE_INDEX, true).build()); + } + + @After + public void teardown() throws IOException { + FeatureFlags.initializeFeatureFlags(Settings.EMPTY); + } + + protected Codec getCodec() { + final Logger testLogger = LogManager.getLogger(MetricAggregatorTests.class); + MapperService mapperService; + try { + mapperService = StarTreeDocValuesFormatTests.createMapperService(StarTreeDocValuesFormatTests.getExpandedMapping()); + } catch (IOException e) { + throw new RuntimeException(e); + } + return new Composite912Codec(Lucene912Codec.Mode.BEST_SPEED, mapperService, testLogger); + } + + public void testStarTreeDocValues() throws IOException { + Directory directory = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(null); + conf.setCodec(getCodec()); + conf.setMergePolicy(newLogMergePolicy()); + RandomIndexWriter iw = new RandomIndexWriter(random(), directory, conf); + + Random random = RandomizedTest.getRandom(); + int totalDocs = 100; + final String SNDV = "sndv"; + final String DV = "dv"; + int val; + + List docs = new ArrayList<>(); + // Index 100 random documents + for (int i = 0; i < totalDocs; i++) { + Document doc = new Document(); + if (random.nextBoolean()) { + val = random.nextInt(10) - 5; // Random long between -5 and 4 + doc.add(new SortedNumericDocValuesField(SNDV, val)); + } + if (random.nextBoolean()) { + val = random.nextInt(20) - 10; // Random long between -10 and 9 + doc.add(new SortedNumericDocValuesField(DV, val)); + } + if (random.nextBoolean()) { + val = random.nextInt(50); // Random long between 0 and 49 + doc.add(new SortedNumericDocValuesField(FIELD_NAME, val)); + } + iw.addDocument(doc); + docs.add(doc); + } + + if (randomBoolean()) { + iw.forceMerge(1); + } + iw.close(); + + DirectoryReader ir = DirectoryReader.open(directory); + initValuesSourceRegistry(); + LeafReaderContext context = ir.leaves().get(0); + + SegmentReader reader = Lucene.segmentReader(context.reader()); + IndexSearcher indexSearcher = newSearcher(reader, false, false); + CompositeIndexReader starTreeDocValuesReader = (CompositeIndexReader) reader.getDocValuesReader(); + + List compositeIndexFields = starTreeDocValuesReader.getCompositeIndexFields(); + CompositeIndexFieldInfo starTree = compositeIndexFields.get(0); + + SumAggregationBuilder sumAggregationBuilder = sum("_name").field(FIELD_NAME); + MaxAggregationBuilder maxAggregationBuilder = max("_name").field(FIELD_NAME); + MinAggregationBuilder minAggregationBuilder = min("_name").field(FIELD_NAME); + ValueCountAggregationBuilder valueCountAggregationBuilder = count("_name").field(FIELD_NAME); + AvgAggregationBuilder avgAggregationBuilder = avg("_name").field(FIELD_NAME); + + List supportedDimensions = new LinkedList<>(); + supportedDimensions.add(new NumericDimension(SNDV)); + supportedDimensions.add(new NumericDimension(DV)); + + Query query = new MatchAllDocsQuery(); + // match-all query + QueryBuilder queryBuilder = null; // no predicates + testCase( + indexSearcher, + query, + queryBuilder, + sumAggregationBuilder, + starTree, + supportedDimensions, + verifyAggregation(InternalSum::getValue) + ); + testCase( + indexSearcher, + query, + queryBuilder, + maxAggregationBuilder, + starTree, + supportedDimensions, + verifyAggregation(InternalMax::getValue) + ); + testCase( + indexSearcher, + query, + queryBuilder, + minAggregationBuilder, + starTree, + supportedDimensions, + verifyAggregation(InternalMin::getValue) + ); + testCase( + indexSearcher, + query, + queryBuilder, + valueCountAggregationBuilder, + starTree, + supportedDimensions, + verifyAggregation(InternalValueCount::getValue) + ); + testCase( + indexSearcher, + query, + queryBuilder, + avgAggregationBuilder, + starTree, + supportedDimensions, + verifyAggregation(InternalAvg::getValue) + ); + + // Numeric-terms query + for (int cases = 0; cases < 100; cases++) { + String queryField; + long queryValue; + if (randomBoolean()) { + queryField = SNDV; + queryValue = random.nextInt(10); + } else { + queryField = DV; + queryValue = random.nextInt(20) - 15; + } + + query = SortedNumericDocValuesField.newSlowExactQuery(queryField, queryValue); + queryBuilder = new TermQueryBuilder(queryField, queryValue); + + testCase( + indexSearcher, + query, + queryBuilder, + sumAggregationBuilder, + starTree, + supportedDimensions, + verifyAggregation(InternalSum::getValue) + ); + testCase( + indexSearcher, + query, + queryBuilder, + maxAggregationBuilder, + starTree, + supportedDimensions, + verifyAggregation(InternalMax::getValue) + ); + testCase( + indexSearcher, + query, + queryBuilder, + minAggregationBuilder, + starTree, + supportedDimensions, + verifyAggregation(InternalMin::getValue) + ); + testCase( + indexSearcher, + query, + queryBuilder, + valueCountAggregationBuilder, + starTree, + supportedDimensions, + verifyAggregation(InternalValueCount::getValue) + ); + testCase( + indexSearcher, + query, + queryBuilder, + avgAggregationBuilder, + starTree, + supportedDimensions, + verifyAggregation(InternalAvg::getValue) + ); + } + + ir.close(); + directory.close(); + } + + BiConsumer verifyAggregation(Function valueExtractor) { + return (expectedAggregation, actualAggregation) -> assertEquals( + valueExtractor.apply(expectedAggregation).doubleValue(), + valueExtractor.apply(actualAggregation).doubleValue(), + 0.0f + ); + } + + private void testCase( + IndexSearcher searcher, + Query query, + QueryBuilder queryBuilder, + T aggBuilder, + CompositeIndexFieldInfo starTree, + List supportedDimensions, + BiConsumer verify + ) throws IOException { + V starTreeAggregation = searchAndReduceStarTree( + createIndexSettings(), + searcher, + query, + queryBuilder, + aggBuilder, + starTree, + supportedDimensions, + DEFAULT_MAX_BUCKETS, + false, + DEFAULT_MAPPED_FIELD + ); + V expectedAggregation = searchAndReduceStarTree( + createIndexSettings(), + searcher, + query, + queryBuilder, + aggBuilder, + null, + null, + DEFAULT_MAX_BUCKETS, + false, + DEFAULT_MAPPED_FIELD + ); + verify.accept(expectedAggregation, starTreeAggregation); + } +} diff --git a/server/src/test/java/org/opensearch/search/aggregations/startree/StarTreeFilterTests.java b/server/src/test/java/org/opensearch/search/aggregations/startree/StarTreeFilterTests.java new file mode 100644 index 0000000000000..f8eb71a40319a --- /dev/null +++ b/server/src/test/java/org/opensearch/search/aggregations/startree/StarTreeFilterTests.java @@ -0,0 +1,319 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.aggregations.startree; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.codecs.Codec; +import org.apache.lucene.codecs.lucene912.Lucene912Codec; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.SortedNumericDocValuesField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.IndexableField; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SegmentReader; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.store.Directory; +import org.apache.lucene.tests.index.RandomIndexWriter; +import org.apache.lucene.util.FixedBitSet; +import org.opensearch.common.lucene.Lucene; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; +import org.opensearch.index.codec.composite.CompositeIndexReader; +import org.opensearch.index.codec.composite.composite912.Composite912Codec; +import org.opensearch.index.codec.composite912.datacube.startree.StarTreeDocValuesFormatTests; +import org.opensearch.index.compositeindex.datacube.MetricStat; +import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues; +import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeQueryHelper; +import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeUtils; +import org.opensearch.index.compositeindex.datacube.startree.utils.iterator.SortedNumericStarTreeValuesIterator; +import org.opensearch.index.mapper.MapperService; +import org.opensearch.search.aggregations.AggregatorTestCase; +import org.opensearch.search.startree.StarTreeFilter; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.opensearch.index.codec.composite912.datacube.startree.StarTreeDocValuesFormatTests.topMapping; + +public class StarTreeFilterTests extends AggregatorTestCase { + + private static final String FIELD_NAME = "field"; + private static final String SNDV = "sndv"; + private static final String SDV = "sdv"; + private static final String DV = "dv"; + + @Before + public void setup() { + FeatureFlags.initializeFeatureFlags(Settings.builder().put(FeatureFlags.STAR_TREE_INDEX, true).build()); + } + + @After + public void teardown() throws IOException { + FeatureFlags.initializeFeatureFlags(Settings.EMPTY); + } + + protected Codec getCodec(int maxLeafDoc, boolean skipStarNodeCreationForSDVDimension) { + final Logger testLogger = LogManager.getLogger(StarTreeFilterTests.class); + MapperService mapperService; + try { + mapperService = StarTreeDocValuesFormatTests.createMapperService( + getExpandedMapping(maxLeafDoc, skipStarNodeCreationForSDVDimension) + ); + } catch (IOException e) { + throw new RuntimeException(e); + } + return new Composite912Codec(Lucene912Codec.Mode.BEST_SPEED, mapperService, testLogger); + } + + public void testStarTreeFilterWithNoDocsInSVDField() throws IOException { + testStarTreeFilter(5, true); + } + + public void testStarTreeFilterWithDocsInSVDFieldButNoStarNode() throws IOException { + testStarTreeFilter(10, false); + } + + private void testStarTreeFilter(int maxLeafDoc, boolean skipStarNodeCreationForSDVDimension) throws IOException { + Directory directory = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(null); + conf.setCodec(getCodec(maxLeafDoc, skipStarNodeCreationForSDVDimension)); + conf.setMergePolicy(newLogMergePolicy()); + RandomIndexWriter iw = new RandomIndexWriter(random(), directory, conf); + int totalDocs = 100; + + List docs = new ArrayList<>(); + for (int i = 0; i < totalDocs; i++) { + Document doc = new Document(); + doc.add(new SortedNumericDocValuesField(SNDV, i)); + doc.add(new SortedNumericDocValuesField(DV, 2 * i)); + doc.add(new SortedNumericDocValuesField(FIELD_NAME, 3 * i)); + if (skipStarNodeCreationForSDVDimension) { + // adding SDV field only star node creation is skipped for SDV dimension + doc.add(new SortedNumericDocValuesField(SDV, 4 * i)); + } + iw.addDocument(doc); + docs.add(doc); + } + iw.forceMerge(1); + iw.close(); + + DirectoryReader ir = DirectoryReader.open(directory); + initValuesSourceRegistry(); + LeafReaderContext context = ir.leaves().get(0); + SegmentReader reader = Lucene.segmentReader(context.reader()); + CompositeIndexReader starTreeDocValuesReader = (CompositeIndexReader) reader.getDocValuesReader(); + + long starTreeDocCount, docCount; + + // assert that all documents are included if no filters are given + starTreeDocCount = getDocCountFromStarTree(starTreeDocValuesReader, Map.of(), context); + docCount = getDocCount(docs, Map.of()); + assertEquals(totalDocs, starTreeDocCount); + assertEquals(docCount, starTreeDocCount); + + // single filter - matches docs + starTreeDocCount = getDocCountFromStarTree(starTreeDocValuesReader, Map.of(SNDV, 0L), context); + docCount = getDocCount(docs, Map.of(SNDV, 0L)); + assertEquals(1, docCount); + assertEquals(docCount, starTreeDocCount); + + // single filter on 3rd field in ordered dimension - matches docs + starTreeDocCount = getDocCountFromStarTree(starTreeDocValuesReader, Map.of(DV, 0L), context); + docCount = getDocCount(docs, Map.of(DV, 0L)); + assertEquals(1, docCount); + assertEquals(docCount, starTreeDocCount); + + // single filter - does not match docs + starTreeDocCount = getDocCountFromStarTree(starTreeDocValuesReader, Map.of(SNDV, 101L), context); + docCount = getDocCount(docs, Map.of(SNDV, 101L)); + assertEquals(0, docCount); + assertEquals(docCount, starTreeDocCount); + + // single filter on 3rd field in ordered dimension - does not match docs + starTreeDocCount = getDocCountFromStarTree(starTreeDocValuesReader, Map.of(DV, -101L), context); + docCount = getDocCount(docs, Map.of(SNDV, -101L)); + assertEquals(0, docCount); + assertEquals(docCount, starTreeDocCount); + + // multiple filters - matches docs + starTreeDocCount = getDocCountFromStarTree(starTreeDocValuesReader, Map.of(SNDV, 0L, DV, 0L), context); + docCount = getDocCount(docs, Map.of(SNDV, 0L, DV, 0L)); + assertEquals(1, docCount); + assertEquals(docCount, starTreeDocCount); + + // no document should match the filter + starTreeDocCount = getDocCountFromStarTree(starTreeDocValuesReader, Map.of(SNDV, 0L, DV, -11L), context); + docCount = getDocCount(docs, Map.of(SNDV, 0L, DV, -11L)); + assertEquals(0, docCount); + assertEquals(docCount, starTreeDocCount); + + // Only the first filter should match some documents, second filter matches none + starTreeDocCount = getDocCountFromStarTree(starTreeDocValuesReader, Map.of(SNDV, 0L, DV, -100L), context); + docCount = getDocCount(docs, Map.of(SNDV, 0L, DV, -100L)); + assertEquals(0, docCount); + assertEquals(docCount, starTreeDocCount); + + // non-dimension fields in filter - should throw IllegalArgumentException + expectThrows( + IllegalArgumentException.class, + () -> getDocCountFromStarTree(starTreeDocValuesReader, Map.of(FIELD_NAME, 0L), context) + ); + + if (skipStarNodeCreationForSDVDimension == true) { + // Documents are not indexed + starTreeDocCount = getDocCountFromStarTree(starTreeDocValuesReader, Map.of(SDV, 4L), context); + docCount = getDocCount(docs, Map.of(SDV, 4L)); + assertEquals(1, docCount); + assertEquals(docCount, starTreeDocCount); + } else { + // Documents are indexed + starTreeDocCount = getDocCountFromStarTree(starTreeDocValuesReader, Map.of(SDV, 4L), context); + docCount = getDocCount(docs, Map.of(SDV, 4L)); + assertEquals(0, docCount); + assertEquals(docCount, starTreeDocCount); + } + + ir.close(); + directory.close(); + } + + // Counts the documents having field SNDV & applied filters + private long getDocCount(List documents, Map filters) { + long count = 0; + for (Document doc : documents) { + // Check if SNDV field is present + IndexableField sndvField = doc.getField(SNDV); + if (sndvField == null) continue; // Skip if SNDV is not present + + // Apply filters if provided + if (!filters.isEmpty()) { + boolean matches = filters.entrySet().stream().allMatch(entry -> { + IndexableField field = doc.getField(entry.getKey()); + return field != null && field.numericValue().longValue() == entry.getValue(); + }); + if (!matches) continue; + } + + // Increment count if the document passes all conditions + count++; + } + return count; + } + + // Returns count of documents in the star tree having field SNDV & applied filters + private long getDocCountFromStarTree(CompositeIndexReader starTreeDocValuesReader, Map filters, LeafReaderContext context) + throws IOException { + List compositeIndexFields = starTreeDocValuesReader.getCompositeIndexFields(); + CompositeIndexFieldInfo starTree = compositeIndexFields.get(0); + StarTreeValues starTreeValues = StarTreeQueryHelper.getStarTreeValues(context, starTree); + FixedBitSet filteredValues = StarTreeFilter.getStarTreeResult(starTreeValues, filters); + + SortedNumericStarTreeValuesIterator valuesIterator = (SortedNumericStarTreeValuesIterator) starTreeValues.getMetricValuesIterator( + StarTreeUtils.fullyQualifiedFieldNameForStarTreeMetricsDocValues( + starTree.getField(), + SNDV, + MetricStat.VALUE_COUNT.getTypeName() + ) + ); + + long docCount = 0; + int numBits = filteredValues.length(); + if (numBits > 0) { + for (int bit = filteredValues.nextSetBit(0); bit != DocIdSetIterator.NO_MORE_DOCS; bit = (bit + 1 < numBits) + ? filteredValues.nextSetBit(bit + 1) + : DocIdSetIterator.NO_MORE_DOCS) { + + // Assert that we can advance to the document ID in the values iterator + boolean canAdvance = valuesIterator.advanceExact(bit); + assert canAdvance : "Cannot advance to document ID " + bit + " in values iterator."; + + // Iterate over values for the current document ID + for (int i = 0, count = valuesIterator.entryValueCount(); i < count; i++) { + long value = valuesIterator.nextValue(); + // Assert that the value is as expected using the provided consumer + docCount += value; + } + } + } + return docCount; + } + + public static XContentBuilder getExpandedMapping(int maxLeafDocs, boolean skipStarNodeCreationForSDVDimension) throws IOException { + return topMapping(b -> { + b.startObject("composite"); + b.startObject("startree"); + b.field("type", "star_tree"); + b.startObject("config"); + b.field("max_leaf_docs", maxLeafDocs); + if (skipStarNodeCreationForSDVDimension) { + b.startArray("skip_star_node_creation_for_dimensions"); + b.value("sdv"); + b.endArray(); + } + b.startArray("ordered_dimensions"); + b.startObject(); + b.field("name", "sndv"); + b.endObject(); + b.startObject(); + b.field("name", "sdv"); + b.endObject(); + b.startObject(); + b.field("name", "dv"); + b.endObject(); + b.endArray(); + b.startArray("metrics"); + b.startObject(); + b.field("name", "field"); + b.startArray("stats"); + b.value("sum"); + b.value("value_count"); + b.value("avg"); + b.value("min"); + b.value("max"); + b.endArray(); + b.endObject(); + b.startObject(); + b.field("name", "sndv"); + b.startArray("stats"); + b.value("sum"); + b.value("value_count"); + b.value("avg"); + b.value("min"); + b.value("max"); + b.endArray(); + b.endObject(); + b.endArray(); + b.endObject(); + b.endObject(); + b.endObject(); + b.startObject("properties"); + b.startObject("sndv"); + b.field("type", "integer"); + b.endObject(); + b.startObject("sdv"); + b.field("type", "integer"); + b.endObject(); + b.startObject("dv"); + b.field("type", "integer"); + b.endObject(); + b.startObject("field"); + b.field("type", "integer"); + b.endObject(); + b.endObject(); + }); + } +} diff --git a/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java index 4abd7fbea9cff..e1728c4476699 100644 --- a/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java @@ -91,11 +91,16 @@ import org.opensearch.index.cache.bitset.BitsetFilterCache; import org.opensearch.index.cache.bitset.BitsetFilterCache.Listener; import org.opensearch.index.cache.query.DisabledQueryCache; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; +import org.opensearch.index.compositeindex.datacube.Dimension; +import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeQueryHelper; import org.opensearch.index.fielddata.IndexFieldData; import org.opensearch.index.fielddata.IndexFieldDataCache; import org.opensearch.index.fielddata.IndexFieldDataService; import org.opensearch.index.mapper.BinaryFieldMapper; import org.opensearch.index.mapper.CompletionFieldMapper; +import org.opensearch.index.mapper.CompositeDataCubeFieldType; +import org.opensearch.index.mapper.CompositeMappedFieldType; import org.opensearch.index.mapper.ConstantKeywordFieldMapper; import org.opensearch.index.mapper.ContentPath; import org.opensearch.index.mapper.DateFieldMapper; @@ -117,6 +122,7 @@ import org.opensearch.index.mapper.RangeType; import org.opensearch.index.mapper.StarTreeMapper; import org.opensearch.index.mapper.TextFieldMapper; +import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.query.QueryShardContext; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.SearchOperationListener; @@ -135,12 +141,14 @@ import org.opensearch.search.aggregations.support.CoreValuesSourceType; import org.opensearch.search.aggregations.support.ValuesSourceRegistry; import org.opensearch.search.aggregations.support.ValuesSourceType; +import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.search.fetch.FetchPhase; import org.opensearch.search.fetch.subphase.FetchDocValuesPhase; import org.opensearch.search.fetch.subphase.FetchSourcePhase; import org.opensearch.search.internal.ContextIndexSearcher; import org.opensearch.search.internal.SearchContext; import org.opensearch.search.lookup.SearchLookup; +import org.opensearch.search.startree.StarTreeQueryContext; import org.opensearch.test.InternalAggregationTestCase; import org.opensearch.test.OpenSearchTestCase; import org.junit.After; @@ -155,6 +163,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Function; @@ -331,6 +340,35 @@ protected A createAggregator(AggregationBuilder aggregati return aggregator; } + protected CountingAggregator createCountingAggregator( + Query query, + QueryBuilder queryBuilder, + AggregationBuilder aggregationBuilder, + IndexSearcher indexSearcher, + IndexSettings indexSettings, + CompositeIndexFieldInfo starTree, + List supportedDimensions, + MultiBucketConsumer bucketConsumer, + MappedFieldType... fieldTypes + ) throws IOException { + SearchContext searchContext; + if (starTree != null) { + searchContext = createSearchContextWithStarTreeContext( + indexSearcher, + indexSettings, + query, + queryBuilder, + starTree, + supportedDimensions, + bucketConsumer, + fieldTypes + ); + } else { + searchContext = createSearchContext(indexSearcher, indexSettings, query, bucketConsumer, fieldTypes); + } + return new CountingAggregator(new AtomicInteger(), createAggregator(aggregationBuilder, searchContext)); + } + /** * Create a {@linkplain SearchContext} for testing an {@link Aggregator}. */ @@ -344,6 +382,49 @@ protected SearchContext createSearchContext( return createSearchContext(indexSearcher, indexSettings, query, bucketConsumer, new NoneCircuitBreakerService(), fieldTypes); } + protected SearchContext createSearchContextWithStarTreeContext( + IndexSearcher indexSearcher, + IndexSettings indexSettings, + Query query, + QueryBuilder queryBuilder, + CompositeIndexFieldInfo starTree, + List supportedDimensions, + MultiBucketConsumer bucketConsumer, + MappedFieldType... fieldTypes + ) throws IOException { + SearchContext searchContext = createSearchContext( + indexSearcher, + indexSettings, + query, + bucketConsumer, + new NoneCircuitBreakerService(), + fieldTypes + ); + + // Mock SearchContextAggregations + SearchContextAggregations searchContextAggregations = mock(SearchContextAggregations.class); + AggregatorFactories aggregatorFactories = mock(AggregatorFactories.class); + when(searchContext.aggregations()).thenReturn(searchContextAggregations); + when(searchContextAggregations.factories()).thenReturn(aggregatorFactories); + when(aggregatorFactories.getFactories()).thenReturn(new AggregatorFactory[] {}); + + CompositeDataCubeFieldType compositeMappedFieldType = mock(CompositeDataCubeFieldType.class); + when(compositeMappedFieldType.name()).thenReturn(starTree.getField()); + when(compositeMappedFieldType.getCompositeIndexType()).thenReturn(starTree.getType()); + Set compositeFieldTypes = Set.of(compositeMappedFieldType); + + when((compositeMappedFieldType).getDimensions()).thenReturn(supportedDimensions); + MapperService mapperService = mock(MapperService.class); + when(mapperService.getCompositeFieldTypes()).thenReturn(compositeFieldTypes); + when(searchContext.mapperService()).thenReturn(mapperService); + + SearchSourceBuilder sb = new SearchSourceBuilder().query(queryBuilder); + StarTreeQueryContext starTreeQueryContext = StarTreeQueryHelper.getStarTreeQueryContext(searchContext, sb); + + when(searchContext.getStarTreeQueryContext()).thenReturn(starTreeQueryContext); + return searchContext; + } + protected SearchContext createSearchContext( IndexSearcher indexSearcher, IndexSettings indexSettings, @@ -651,6 +732,67 @@ protected A searchAndReduc return internalAgg; } + protected A searchAndReduceStarTree( + IndexSettings indexSettings, + IndexSearcher searcher, + Query query, + QueryBuilder queryBuilder, + AggregationBuilder builder, + CompositeIndexFieldInfo compositeIndexFieldInfo, + List supportedDimensions, + int maxBucket, + boolean hasNested, + MappedFieldType... fieldTypes + ) throws IOException { + query = query.rewrite(searcher); + final IndexReaderContext ctx = searcher.getTopReaderContext(); + final PipelineTree pipelines = builder.buildPipelineTree(); + List aggs = new ArrayList<>(); + if (hasNested) { + query = Queries.filtered(query, Queries.newNonNestedFilter()); + } + + MultiBucketConsumer bucketConsumer = new MultiBucketConsumer( + maxBucket, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ); + CountingAggregator countingAggregator = createCountingAggregator( + query, + queryBuilder, + builder, + searcher, + indexSettings, + compositeIndexFieldInfo, + supportedDimensions, + bucketConsumer, + fieldTypes + ); + + countingAggregator.preCollection(); + searcher.search(query, countingAggregator); + countingAggregator.postCollection(); + aggs.add(countingAggregator.buildTopLevel()); + if (compositeIndexFieldInfo != null) { + assertEquals(0, countingAggregator.collectCounter.get()); + } + + MultiBucketConsumer reduceBucketConsumer = new MultiBucketConsumer( + maxBucket, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ); + InternalAggregation.ReduceContext context = InternalAggregation.ReduceContext.forFinalReduction( + countingAggregator.context().bigArrays(), + getMockScriptService(), + reduceBucketConsumer, + pipelines + ); + + @SuppressWarnings("unchecked") + A internalAgg = (A) aggs.get(0).reduce(aggs, context); + doAssertReducedMultiBucketConsumer(internalAgg, reduceBucketConsumer); + return internalAgg; + } + protected void doAssertReducedMultiBucketConsumer(Aggregation agg, MultiBucketConsumerService.MultiBucketConsumer bucketConsumer) { InternalAggregationTestCase.assertMultiBucketConsumer(agg, bucketConsumer); }