From 577d333aef57f71ca2144a156301602b4b43be1c Mon Sep 17 00:00:00 2001 From: Ronnak Saxena <63483386+ronnaksaxena@users.noreply.github.com> Date: Sat, 30 Sep 2023 10:26:45 -0700 Subject: [PATCH] Rollup Search API : Searching both historical rollup and non-rollup data (#901) * added response interceptor Signed-off-by: Ronnak Saxena * Base case: Query Live and Rollup data with no overlap Signed-off-by: Ronnak Saxena * finished base case and added integ test Signed-off-by: Ronnak Saxena * added to response interceptor Signed-off-by: Ronnak Saxena * can rewrite request to bucket pipeline Signed-off-by: Ronnak Saxena * trying to rewrite aggregations in a helper function Signed-off-by: Ronnak Saxena * able to create new aggreations but getting shardIndex is not set error Signed-off-by: Ronnak Saxena * Can find start and end times for rollup and live index Signed-off-by: Ronnak Saxena * Handles overlap between 1 live index and 1 rollup index for sum aggregation Signed-off-by: Ronnak Saxena * added min max aggregations and fixed intersection time calculation Signed-off-by: Ronnak Saxena * changed variable name in computeAggregationsWithoutOverlap Signed-off-by: Ronnak Saxena * Added integ tests for nonoverlapping case Signed-off-by: Ronnak Saxena * added avg and value count aggregation Signed-off-by: Ronnak Saxena * fixed ktlint and integ test Signed-off-by: Ronnak Saxena * changed test and build workflow Signed-off-by: Ronnak Saxena * added integ test for multiple live indices Signed-off-by: Ronnak Saxena * added test case for alias live indices Signed-off-by: Ronnak Saxena * cleaned up code and moved functions to utils file Signed-off-by: Ronnak Saxena * fixed detekt errors Signed-off-by: Ronnak Saxena * fixed ktlint error :/' Signed-off-by: Ronnak Saxena * Can run all integ tests at once now Signed-off-by: Ronnak Saxena * removed DateTimeFromatter Signed-off-by: Ronnak Saxena * fixed inf interceptor loop, need to pass RollupInterceptorIT.test rollup search multiple target indices failed test Signed-off-by: Ronnak Saxena * passes all integ tests Signed-off-by: Ronnak Saxena * fixed detekt errors Signed-off-by: Ronnak Saxena * deleted rest test Signed-off-by: Ronnak Saxena * added test back Signed-off-by: Ronnak Saxena * trying a new workflow build Signed-off-by: Ronnak Saxena * added stars to worklow files Signed-off-by: Ronnak Saxena * added unit test Signed-off-by: Ronnak Saxena * resolved some PR comments from Bowen Signed-off-by: Ronnak Saxena * resolved more comments on my PR Signed-off-by: Ronnak Saxena * removed stars from workflows Signed-off-by: Ronnak Saxena * testing time of alias test case Signed-off-by: Ronnak Saxena * boring, took too long Signed-off-by: Ronnak Saxena * commented out last 2 tests Signed-off-by: Ronnak Saxena * removed all response interceptor tests Signed-off-by: Ronnak Saxena * added one test back Signed-off-by: Ronnak Saxena * fixed data stream integ tests Signed-off-by: Ronnak Saxena * commented out breaking tests Signed-off-by: Ronnak Saxena * added tests back Signed-off-by: Ronnak Saxena * removed countdown latch and added coroutines Signed-off-by: Ronnak Saxena * resolved comments on the PR Signed-off-by: Ronnak Saxena * resolved PR comments added kotlin docs for methods Signed-off-by: Ronnak Saxena --------- Signed-off-by: Ronnak Saxena --- .../workflows/multi-node-test-workflow.yml | 2 +- .github/workflows/test-and-build-workflow.yml | 2 +- .../indexmanagement/IndexManagementPlugin.kt | 5 +- .../rollup/interceptor/ResponseInterceptor.kt | 429 ++++++++++ .../rollup/interceptor/RollupInterceptor.kt | 260 +++++- .../rollup/util/RollupUtils.kt | 83 ++ .../interceptor/ResponseInterceptorIT.kt | 753 ++++++++++++++++++ .../rollup/interceptor/RollupInterceptorIT.kt | 15 - .../resthandler/RestDeleteRollupActionIT.kt | 1 - .../rollup/util/RollupUtilsTests.kt | 61 ++ 10 files changed, 1556 insertions(+), 55 deletions(-) create mode 100644 src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/ResponseInterceptor.kt create mode 100644 src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/ResponseInterceptorIT.kt diff --git a/.github/workflows/multi-node-test-workflow.yml b/.github/workflows/multi-node-test-workflow.yml index aaa37dc98..0bb068e78 100644 --- a/.github/workflows/multi-node-test-workflow.yml +++ b/.github/workflows/multi-node-test-workflow.yml @@ -3,7 +3,7 @@ name: Multi node test workflow on: pull_request: branches: - - "*" + - "**" push: branches: - "*" diff --git a/.github/workflows/test-and-build-workflow.yml b/.github/workflows/test-and-build-workflow.yml index d324148e8..639ea274f 100644 --- a/.github/workflows/test-and-build-workflow.yml +++ b/.github/workflows/test-and-build-workflow.yml @@ -2,7 +2,7 @@ name: Test and Build Workflow on: pull_request: branches: - - "*" + - "**" push: branches: - "*" diff --git a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt index bea66041f..7db5aba11 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt @@ -113,6 +113,7 @@ import org.opensearch.indexmanagement.rollup.action.start.TransportStartRollupAc import org.opensearch.indexmanagement.rollup.action.stop.StopRollupAction import org.opensearch.indexmanagement.rollup.action.stop.TransportStopRollupAction import org.opensearch.indexmanagement.rollup.actionfilter.FieldCapsFilter +import org.opensearch.indexmanagement.rollup.interceptor.ResponseInterceptor import org.opensearch.indexmanagement.rollup.interceptor.RollupInterceptor import org.opensearch.indexmanagement.rollup.model.Rollup import org.opensearch.indexmanagement.rollup.model.RollupMetadata @@ -208,6 +209,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin lateinit var clusterService: ClusterService lateinit var indexNameExpressionResolver: IndexNameExpressionResolver lateinit var rollupInterceptor: RollupInterceptor + lateinit var responseInterceptor: ResponseInterceptor lateinit var fieldCapsFilter: FieldCapsFilter lateinit var indexMetadataProvider: IndexMetadataProvider private val indexMetadataServices: MutableList> = mutableListOf() @@ -391,6 +393,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin environment ) rollupInterceptor = RollupInterceptor(clusterService, settings, indexNameExpressionResolver) + responseInterceptor = ResponseInterceptor(clusterService, settings, indexNameExpressionResolver, client) val jvmService = JvmService(environment.settings()) val transformRunner = TransformRunner.initialize( client, @@ -612,7 +615,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin } override fun getTransportInterceptors(namedWriteableRegistry: NamedWriteableRegistry, threadContext: ThreadContext): List { - return listOf(rollupInterceptor) + return listOf(rollupInterceptor, responseInterceptor) } override fun getActionFilters(): List { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/ResponseInterceptor.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/ResponseInterceptor.kt new file mode 100644 index 000000000..608fc3959 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/ResponseInterceptor.kt @@ -0,0 +1,429 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.rollup.interceptor + +import kotlinx.coroutines.CoroutineName +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.launch +import org.apache.logging.log4j.LogManager +import org.opensearch.action.search.SearchRequest +import org.opensearch.action.search.SearchResponse +import org.opensearch.client.Client +import org.opensearch.cluster.metadata.IndexNameExpressionResolver +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.settings.Settings +import org.opensearch.indexmanagement.common.model.dimension.DateHistogram +import org.opensearch.indexmanagement.rollup.model.Rollup +import org.opensearch.search.DocValueFormat +import org.opensearch.search.aggregations.InternalAggregation +import org.opensearch.search.aggregations.InternalAggregations +import org.opensearch.search.aggregations.bucket.histogram.InternalDateHistogram +import org.opensearch.search.builder.SearchSourceBuilder +import org.opensearch.search.internal.ShardSearchRequest +import org.opensearch.search.query.QuerySearchResult +import org.opensearch.index.query.QueryBuilders +import org.opensearch.indexmanagement.opensearchapi.suspendUntil +import org.opensearch.indexmanagement.rollup.util.convertDateStringToEpochMillis +import org.opensearch.indexmanagement.rollup.util.convertFixedIntervalStringToMs +import org.opensearch.indexmanagement.rollup.util.getRollupJobs +import org.opensearch.indexmanagement.rollup.util.isRollupIndex +import org.opensearch.indexmanagement.rollup.util.zonedDateTimeToMillis +import org.opensearch.search.aggregations.metrics.InternalAvg +import org.opensearch.search.aggregations.metrics.InternalMax +import org.opensearch.search.aggregations.metrics.InternalMin +import org.opensearch.search.aggregations.metrics.InternalScriptedMetric +import org.opensearch.search.aggregations.metrics.InternalSum +import org.opensearch.search.aggregations.metrics.InternalValueCount +import org.opensearch.search.sort.SortBuilders +import org.opensearch.search.sort.SortOrder +import org.opensearch.transport.Transport +import org.opensearch.transport.TransportException +import org.opensearch.transport.TransportInterceptor +import org.opensearch.transport.TransportRequest +import org.opensearch.transport.TransportRequestOptions +import org.opensearch.transport.TransportResponse +import org.opensearch.transport.TransportResponseHandler +import java.time.ZonedDateTime +import kotlin.math.max +import kotlin.math.min + +/** +* The Response Interceptor class modifies resopnses if the search API is triggered on rollup and live data +* 1. The class checks if the request was rewritten in the RollupInterceptor into more granular buckets +* 2. in findOverlap it checks for overlap between live data and rollup data and returns the interval to include +* 3. computeAggregationsWithoutOverlap() iterates through the buckets and recomputes the aggregations in the expected format +* 4. Returns a new response for each shard to be combined later + **/ +class ResponseInterceptor( + val clusterService: ClusterService, + val settings: Settings, + val indexNameExpressionResolver: IndexNameExpressionResolver, + val client: Client +) : TransportInterceptor, + CoroutineScope by CoroutineScope(SupervisorJob() + Dispatchers.Default + CoroutineName("Rollup Response Interceptor")) { + private val logger = LogManager.getLogger(javaClass) + override fun interceptSender(sender: TransportInterceptor.AsyncSender): TransportInterceptor.AsyncSender { + return CustomAsyncSender(sender) + } + + private inner class CustomAsyncSender(private val originalSender: TransportInterceptor.AsyncSender) : TransportInterceptor.AsyncSender { + override fun sendRequest( + connection: Transport.Connection, + action: String, + request: TransportRequest, + options: TransportRequestOptions, + handler: TransportResponseHandler + ) { + val interceptedHandler = CustomResponseHandler(handler) + + originalSender.sendRequest(connection, action, request, options, interceptedHandler) + } + } + + @Suppress("TooManyFunctions") + private inner class CustomResponseHandler( + private val originalHandler: TransportResponseHandler? + ) : TransportResponseHandler { + override fun read(inStream: StreamInput?): T { + val response = originalHandler?.read(inStream) + return response!! + } + + /** + * Check if this response was modified in the request interceptor + * and should be put back together + * @param QuerySearchResult + * @return Boolean + */ + fun isRewrittenInterceptorRequest(response: QuerySearchResult): Boolean { + val currentAggregations = response.aggregations().expand() + for (agg in currentAggregations) { + if (agg.name == "interceptor_interval_data") { + return true + } + } + return false + } + @Suppress("SpreadOperator") + fun getRollupJob(response: QuerySearchResult): Rollup? { + val originalRequest = response.shardSearchRequest!! + val indices = originalRequest.indices().map { it.toString() }.toTypedArray() + val allIndices = indexNameExpressionResolver + .concreteIndexNames(clusterService.state(), originalRequest.indicesOptions(), *indices) + for (index in allIndices) { + if (isRollupIndex(index, clusterService.state())) { + return clusterService.state().metadata.index(index).getRollupJobs()?.get(0)!! + } + } + return null + } + @Suppress("SpreadOperator") + fun getRollupAndLiveIndices(request: ShardSearchRequest): Pair, Array> { + val liveIndices = mutableListOf() + val rollupIndices = mutableListOf() + val indices = request.indices().map { it.toString() }.toTypedArray() + val concreteIndices = indexNameExpressionResolver + .concreteIndexNames(clusterService.state(), request.indicesOptions(), *indices) + for (indexName in concreteIndices) { + if (isRollupIndex(indexName, clusterService.state())) { + rollupIndices.add(indexName) + } else { + liveIndices.add(indexName) + } + } + return Pair(rollupIndices.toTypedArray(), liveIndices.toTypedArray()) + } + + /** + * Calculates the end time for the current shard index if it is a rollup index with data overlapp + * @params liveDataStartPoint: Long, rollupIndices: Array, dateTargetField: String + * @return Long + **/ + @Suppress("SpreadOperator") + suspend fun getRollupEndTime(liveDataStartPoint: Long, rollupIndices: Array, dateTargetField: String): Long { + // Build search request to find the maximum rollup timestamp <= liveDataStartPoint + val sort = SortBuilders.fieldSort("$dateTargetField.date_histogram").order(SortOrder.DESC) + val query = QueryBuilders.boolQuery() + .must(QueryBuilders.rangeQuery(dateTargetField).lte(liveDataStartPoint)) + val searchSourceBuilder = SearchSourceBuilder() + .sort(sort) + .query(query) + .size(1) + // Need to avoid infinite interceptor loop + val req = SearchRequest() + .source(searchSourceBuilder) + .indices(*rollupIndices) + val res = client.suspendUntil { search(req, it) } + try { + return res!!.hits.hits[0].sourceAsMap.get("$dateTargetField.date_histogram") as Long + } catch (e: Exception) { + logger.error("Not able to retrieve intersection time from response: ", e) + } + return 0L // dummy :P + } + /** + * Checks for overlap in timeseries data and returns the non overlapping interval to include + * Note startRange is inclusive and endRange is exclusive, they are Longs because the type is epoch milliseconds + * @param QuerySearchResult + * @return Pair(startRange: Long, endRange: Long) + **/ + // TODO intercept at the index level instead of the shard level to avoid redundant client calls for every index + @Suppress("LongMethod", "SpreadOperator") + suspend fun findOverlap(response: QuerySearchResult): Pair { + // TODO add more error logs and try catch statements for client calls + val job: Rollup = getRollupJob(response)!! + var dateSourceField: String = "" + var dateTargetField: String = "" + var rollupInterval: String? = "" + for (dim in job.dimensions) { + if (dim is DateHistogram) { + dateSourceField = dim.sourceField + dateTargetField = dim.targetField + rollupInterval = dim.fixedInterval + break + } + } + val request: ShardSearchRequest = response.shardSearchRequest!! + val oldQuery = request.source().query() + val (rollupIndices, liveIndices) = getRollupAndLiveIndices(request) + val shardRequestIndex = request.shardId().indexName + val isShardIndexRollup = isRollupIndex(shardRequestIndex, clusterService.state()) + // Build search request to find the maximum date in all rollup indices + var sort = SortBuilders.fieldSort("$dateTargetField.date_histogram").order(SortOrder.DESC) + var searchSourceBuilder = SearchSourceBuilder() + .sort(sort) + .query(oldQuery) + .size(1) + // Need to avoid infinite interceptor loop + val maxRollupDateRequest = SearchRequest() + .source(searchSourceBuilder) + .indices(*rollupIndices) // add all rollup indices to this request + logger.debug("Sending maxRollupDate request for $shardRequestIndex") + val maxRollupDateResponse: SearchResponse? = client.suspendUntil { search(maxRollupDateRequest, it) } + // Build search request to find the minimum date in all live indices + sort = SortBuilders.fieldSort(dateSourceField).order(SortOrder.ASC) + searchSourceBuilder = SearchSourceBuilder() + .sort(sort) + .size(1) + val minLiveDateRequest = SearchRequest() + .source(searchSourceBuilder) + /* + If the response shard index is a rollup index, need to find the minimum value of all the live indices to compute the overlap + This is because I am comparing this index to all the live data to compute the interval I want to keep + If the response shard index is a live index, need to only compute minimum value of the current shard index + */ + if (isShardIndexRollup) { + minLiveDateRequest.indices(*liveIndices) + } else { // shard index is live index + minLiveDateRequest.indices(shardRequestIndex) + } + logger.debug("Sending minLiveData request for $shardRequestIndex") + var minLiveDateResponse: SearchResponse? = client.suspendUntil { search(minLiveDateRequest, it) } + val foundMinAndMax = (minLiveDateResponse != null && maxRollupDateResponse != null) + // if they overlap find part to exclude + if (foundMinAndMax && minLiveDateResponse!!.hits.hits.isNotEmpty() && maxRollupDateResponse!!.hits.hits.isNotEmpty()) { + // Rollup data ends at maxRolledDate + fixedInterval + val maxRolledDate: Long = maxRollupDateResponse.hits.hits[0].sourceAsMap.get("$dateTargetField.date_histogram") as Long + val rollupDataEndPoint = maxRolledDate + convertFixedIntervalStringToMs(fixedInterval = rollupInterval!!) + val minLiveDate = minLiveDateResponse.hits.hits[0].sourceAsMap.get("$dateSourceField") as String + val liveDataStartPoint = convertDateStringToEpochMillis(minLiveDate) + // If intersection found on rollup index, remove overlap + if ((liveDataStartPoint < rollupDataEndPoint) && isShardIndexRollup) { + // Start at 0, end at live data + logger.debug("Sending request to find rollup endtime for index: $shardRequestIndex") + val endTime = getRollupEndTime(liveDataStartPoint, rollupIndices, dateTargetField) + return Pair(0L, endTime) + } + } + // No overlap or is live data index so start and end include everything + return Pair(0L, Long.MAX_VALUE) + } + + /** + * Depending on which metric the aggregation is, computation is different + * @params agg: org.opensearch.search.aggregations.Aggregation, currentValue: Any + * @return Pair + */ + @Suppress("ReturnCount") + fun computeRunningValue(agg: org.opensearch.search.aggregations.Aggregation, currentValue: Any): Pair { + when (agg) { + is InternalSum -> { + return Pair(agg.value + (currentValue as Double), agg.type) + } + is InternalMax -> { + return Pair(max(agg.value, (currentValue as Double)), agg.type) + } + is InternalMin -> { + return Pair(min(agg.value, (currentValue as Double)), agg.type) + } + is InternalValueCount -> { // Live data uses this + return Pair(agg.value + (currentValue as Long), agg.type) + } + is InternalScriptedMetric -> { + // Rollup InternalValueCount + return Pair((agg.aggregation() as Long) + (currentValue as Long), "value_count") + } + else -> throw IllegalArgumentException("This aggregation is not currently supported in rollups searches: ${agg.name}") + } + } + // Depending on which metric the aggregation is return a different start value + @Suppress("ReturnCount") + fun getAggComputationStartValue(agg: org.opensearch.search.aggregations.Aggregation): Pair { + when (agg) { + is InternalSum -> return Pair(agg.value, agg.type) + is InternalMax -> return Pair(agg.value, agg.type) + is InternalMin -> return Pair(agg.value, agg.type) + is InternalValueCount -> return Pair(agg.value, agg.type) // Live data + is InternalScriptedMetric -> return Pair(agg.aggregation(), "value_count") // Rollup data + else -> throw IllegalArgumentException("This aggregation is not currently supported in rollups searches: ${agg.name}") + } + } + @Suppress("ReturnCount") + fun createNewMetricAgg(aggName: String, aggValue: Any, aggType: String): InternalAggregation { + when (aggType) { + "sum" -> return InternalSum(aggName, (aggValue as Double), DocValueFormat.RAW, null) + "min" -> return InternalMin(aggName, (aggValue as Double), DocValueFormat.RAW, null) + "max" -> return InternalMax(aggName, (aggValue as Double), DocValueFormat.RAW, null) + "value_count" -> return InternalValueCount(aggName, (aggValue as Long), null) + else -> throw IllegalArgumentException("Could not recreate an aggregation for type $aggType") + } + } + /** + * Create original avg aggregation + * @return InternalAvg + */ + + fun initRollupAvgAgg( + modifiedName: String, + value: Any, + aggValues: MutableMap>, + addedAggregations: MutableSet + ): InternalAvg { + // Sum calc + if (modifiedName.contains(".rollup.avg.sum")) { + // Won't double count + addedAggregations += modifiedName + val originalName = modifiedName.removeSuffix(".rollup.avg.sum") + val avgSum: Double = value as Double + for ((aggName, data) in aggValues) { + // Found value count component to create InternalAvg object + if (!addedAggregations.contains(aggName) && aggName.contains(originalName)) { + addedAggregations += aggName + val (avgCount, _) = data + return InternalAvg(originalName, avgSum, (avgCount as Long), DocValueFormat.RAW, null) + } + } + } else { // Value count calc + // Put in set to avoid adding the aggregation twice + addedAggregations += modifiedName + val originalName = modifiedName.removeSuffix(".rollup.avg.value_count") + val avgCount = value as Long + for ((aggName, data) in aggValues) { + // Found sum component to create InternalAvg object + if (!addedAggregations.contains(aggName) && aggName.contains(originalName)) { + addedAggregations += aggName + val (avgSum, _) = data + return InternalAvg(originalName, (avgSum as Double), avgCount, DocValueFormat.RAW, null) + } + } + } + throw NullPointerException("Can't calculate avg agg for rollup index") + } + + /** + * Returns a new InternalAggregations that contains merged aggregation(s) with the overlapping data removed + * @params intervalAggregations: InternalAggregations, start: Long, end: Long + * @return InternalAggregations + */ + @Suppress("NestedBlockDepth") + fun computeAggregationsWithoutOverlap(intervalAggregations: InternalAggregations, start: Long, end: Long): InternalAggregations { + // Store the running values of the aggregations being computed + // {aggName: String: Pair} + val aggValues = mutableMapOf>() + + // Iterate through each aggregation and bucket + val interceptorAgg = intervalAggregations.asMap().get("interceptor_interval_data") as InternalDateHistogram + for (bucket in interceptorAgg.buckets) { + val zdt = bucket.key as ZonedDateTime + val timestamp: Long = zonedDateTimeToMillis(zdt) + // Only consider buckets within the specified range + // Start is inclusive and end is exclusive + if (timestamp >= start && timestamp < end) { + for (originalAgg in bucket.aggregations) { + val aggName = originalAgg.name + if (aggValues.containsKey(aggName)) { + // Compute running calculation + val (currentValue, _) = aggValues[aggName]!! + aggValues[aggName] = computeRunningValue(originalAgg!!, currentValue) + } else { + aggValues[aggName] = getAggComputationStartValue(originalAgg) + } + } + } + } + + // Create a new InternalAggregations with recomputed values discarding the overlap + val allAggregations = mutableListOf() + val addedAggregations = mutableSetOf() // avoid repeating the same aggregations + for ((aggName, data) in aggValues) { + if (addedAggregations.contains(aggName)) continue + // special case to compute value_count for rollup indices + else if (aggName.contains(".rollup.value_count")) { + val (value, _) = data + val originalName = aggName.removeSuffix(".rollup.value_count") + allAggregations.add(InternalValueCount(originalName, value as Long, null)) + addedAggregations += aggName + } + // special case to compute avg agg using sum and value_count calculation + else if (aggName.contains(".rollup.avg.sum") || aggName.contains(".rollup.avg.value_count")) { + val (value, _) = data + allAggregations.add(initRollupAvgAgg(aggName, value, aggValues, addedAggregations)) + } else { // Sum, Min, or Max agg + val (value, type) = data + val newAgg = createNewMetricAgg(aggName, value, type) + allAggregations.add(newAgg) + addedAggregations += aggName + } + } + return InternalAggregations(allAggregations, null) + } + @Suppress("UNCHECKED_CAST") + override fun handleResponse(response: T?) { + // Handle the response if it came from interceptor + when (response) { + // live index + is QuerySearchResult -> { + if (response.hasAggs() && isRewrittenInterceptorRequest(response)) { + launch { + // Check for overlap + val (startTime, endTime) = findOverlap(response) + // Modify agg to be original result without overlap computed in + response.aggregations(computeAggregationsWithoutOverlap(response.aggregations().expand(), startTime, endTime)) + originalHandler?.handleResponse(response) + } + } else { + originalHandler?.handleResponse(response) + } + } else -> { + // Delegate to original handler + originalHandler?.handleResponse(response) + } + } + } + + override fun handleException(exp: TransportException?) { + // Handle exceptions or delegate to the original handler + originalHandler?.handleException(exp) + } + + override fun executor(): String { + return originalHandler?.executor() ?: "" + } + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt index ffd1e4bd7..5cb9a8b46 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt @@ -11,39 +11,43 @@ import org.opensearch.cluster.ClusterState import org.opensearch.cluster.metadata.IndexNameExpressionResolver import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.Settings +import org.opensearch.index.query.QueryBuilder +import org.opensearch.index.query.TermQueryBuilder +import org.opensearch.index.query.TermsQueryBuilder +import org.opensearch.index.query.RangeQueryBuilder +import org.opensearch.index.query.MatchAllQueryBuilder import org.opensearch.index.query.BoolQueryBuilder import org.opensearch.index.query.BoostingQueryBuilder import org.opensearch.index.query.ConstantScoreQueryBuilder import org.opensearch.index.query.DisMaxQueryBuilder -import org.opensearch.index.query.MatchAllQueryBuilder import org.opensearch.index.query.MatchPhraseQueryBuilder -import org.opensearch.index.query.QueryBuilder import org.opensearch.index.query.QueryStringQueryBuilder -import org.opensearch.index.query.RangeQueryBuilder -import org.opensearch.index.query.TermQueryBuilder -import org.opensearch.index.query.TermsQueryBuilder import org.opensearch.index.search.MatchQuery +import org.opensearch.indexmanagement.common.model.dimension.DateHistogram import org.opensearch.indexmanagement.common.model.dimension.Dimension import org.opensearch.indexmanagement.rollup.model.Rollup import org.opensearch.indexmanagement.rollup.model.RollupFieldMapping import org.opensearch.indexmanagement.rollup.model.RollupFieldMapping.Companion.UNKNOWN_MAPPING import org.opensearch.indexmanagement.rollup.query.QueryStringQueryUtil import org.opensearch.indexmanagement.rollup.settings.RollupSettings -import org.opensearch.indexmanagement.rollup.util.getDateHistogram -import org.opensearch.indexmanagement.rollup.util.getRollupJobs import org.opensearch.indexmanagement.rollup.util.isRollupIndex +import org.opensearch.indexmanagement.rollup.util.getRollupJobs +import org.opensearch.indexmanagement.rollup.util.changeAggregations import org.opensearch.indexmanagement.rollup.util.populateFieldMappings +import org.opensearch.indexmanagement.rollup.util.getDateHistogram import org.opensearch.indexmanagement.rollup.util.rewriteSearchSourceBuilder import org.opensearch.indexmanagement.util.IndexUtils +import org.opensearch.search.aggregations.AggregatorFactories import org.opensearch.search.aggregations.AggregationBuilder +import org.opensearch.search.aggregations.AggregationBuilders import org.opensearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder import org.opensearch.search.aggregations.bucket.histogram.DateHistogramInterval import org.opensearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder -import org.opensearch.search.aggregations.metrics.AvgAggregationBuilder +import org.opensearch.search.aggregations.metrics.SumAggregationBuilder import org.opensearch.search.aggregations.metrics.MaxAggregationBuilder +import org.opensearch.search.aggregations.metrics.AvgAggregationBuilder import org.opensearch.search.aggregations.metrics.MinAggregationBuilder -import org.opensearch.search.aggregations.metrics.SumAggregationBuilder import org.opensearch.search.aggregations.metrics.ValueCountAggregationBuilder import org.opensearch.search.internal.ShardSearchRequest import org.opensearch.tasks.Task @@ -51,11 +55,11 @@ import org.opensearch.transport.TransportChannel import org.opensearch.transport.TransportInterceptor import org.opensearch.transport.TransportRequest import org.opensearch.transport.TransportRequestHandler - +@Suppress("TooManyFunctions") class RollupInterceptor( val clusterService: ClusterService, val settings: Settings, - val indexNameExpressionResolver: IndexNameExpressionResolver + val indexNameExpressionResolver: IndexNameExpressionResolver, ) : TransportInterceptor { private val logger = LogManager.getLogger(javaClass) @@ -72,7 +76,145 @@ class RollupInterceptor( } } + /** + * Checks if any of the indices in the original request contain a rollup index + * @param ShardSearchRequest + * @return Pair + */ @Suppress("SpreadOperator") + private fun originalSearchContainsRollup(request: ShardSearchRequest): Pair { // Throwing an error on data streams + val indices = request.indices().map { it.toString() }.toTypedArray() + val allIndices = indexNameExpressionResolver + .concreteIndexNames(clusterService.state(), request.indicesOptions(), *indices) + for (index in allIndices) { + if (isRollupIndex(index, clusterService.state())) { + val rollupJob = clusterService.state().metadata.index(index).getRollupJobs()?.get(0) + return Pair(true, rollupJob) + } + } + return Pair(false, null) + } + /** + * Returns true if request was already modified into "interceptor_interval_data" bucket aggregation + * @param ShardSearchRequest + * @return Boolean + */ + fun isRequestRewrittenIntoBuckets(request: ShardSearchRequest): Boolean { + val currentAggs = request.source().aggregations().aggregatorFactories + if (currentAggs != null) { + for (agg in currentAggs) { + if (agg.name == "interceptor_interval_data") { + return true + } + } + } + return false + } + /** + * Helper fn to avoid rewritting a rollup request an extra time + * @param ShardSearchRequest + * @return Boolean + */ + fun isRequestRollupFormat(request: ShardSearchRequest): Boolean { + if (request.source().query() != null) { + val jsonRequest: String = request.source().query().toString() + // Detected dummy field from internal search request + if (jsonRequest.contains("rollup._id")) { + return true + } + } + return false + } + + /** + * If the request has a sort on it, size can be > 0 on a rollup search + * Context: we need to make call to find out the min, max time of rollup and live indexes, + * however, this call would be intercepted here and get re-write. + * + * So the idea is to explicitly allow this call to go through rollup interceptor w/o re-write them, + * so that whether it's from response interceptor or client, both should work. + * @param ShardSearchRequest + * @return Boolean + */ + fun allowRequest(request: ShardSearchRequest): Boolean { + return request.source().sorts() != null + } + /** + * Need to modify aggs for rollup docs with avg and value count aggs + * Context in order to recompute the avg metric in the response interceptor + * Need to modify the avg request into a sum and value count request, + * Then ResponseInterceptor puts the aggs back into an avg aggregation + * @param MutableCollection + * @return AggregatorFactories.Builder + */ + fun modifyRollupAggs(aggFacts: MutableCollection): AggregatorFactories.Builder { + val build = AggregatorFactories.builder() + for (agg in aggFacts) { + when (agg) { + is SumAggregationBuilder -> { + build.addAggregator(agg) + } + is MaxAggregationBuilder -> { + build.addAggregator(agg) + } + + is MinAggregationBuilder -> { + build.addAggregator(agg) + } + + is ValueCountAggregationBuilder -> { + // I want to append .rollup.value_count to the name so its identified in response interceptor + val newValueCount = ValueCountAggregationBuilder("${agg.name}.rollup.value_count") + newValueCount.field(agg.field()) + build.addAggregator(newValueCount) + } + is AvgAggregationBuilder -> { + // Going to split this into a value_count and a sum agg to put together in response interceptor + // Need to do this since .count and .sum are private in InternalAvg + val avgValueCount = ValueCountAggregationBuilder("${agg.name}.rollup.avg.value_count") + avgValueCount.field(agg.field()) + build.addAggregator(avgValueCount) + val avgSumCount = SumAggregationBuilder("${agg.name}.rollup.avg.sum") + avgSumCount.field(agg.field()) + build.addAggregator(avgSumCount) + } + + else -> throw IllegalArgumentException("The ${agg.type} aggregation is not currently supported in rollups") + } + } + return build + } + + /** + * Wrap original aggregations into a bucket aggreagtion based on fixed interval + * to make response more granular and remove overlap in response interceptor + * @params request: ShardSearchRequest, rollupJob: Rollup + */ + fun breakIntoBuckets(request: ShardSearchRequest, rollupJob: Rollup) { + val oldAggs = modifyRollupAggs(request.source().aggregations().aggregatorFactories) + var dateSourceField: String = "" + var rollupInterval: String = "" + for (dim in rollupJob.dimensions) { + if (dim is DateHistogram) { + dateSourceField = dim.sourceField + rollupInterval = dim.fixedInterval!! + break + } + } + // Wraps all existing aggs in bucket aggregation + // Notifies the response interceptor that was rewritten since agg name is interceptor_interval_data + // Edge case if User selected interceptor_interval_data as the aggregation name :/ + val intervalAgg = AggregationBuilders.dateHistogram("interceptor_interval_data") + .field(dateSourceField) + .calendarInterval(DateHistogramInterval(rollupInterval)) + .format("epoch_millis") + .subAggregations(oldAggs) + // Changes aggregation in source to new agg + request.source(request.source().changeAggregations(listOf(intervalAgg))) + return + } + + @Suppress("SpreadOperator", "NestedBlockDepth") override fun interceptHandler( action: String, executor: String, @@ -82,31 +224,35 @@ class RollupInterceptor( return object : TransportRequestHandler { override fun messageReceived(request: T, channel: TransportChannel, task: Task) { if (searchEnabled && request is ShardSearchRequest) { - val index = request.shardId().indexName - val isRollupIndex = isRollupIndex(index, clusterService.state()) - if (isRollupIndex) { - if (request.source().size() != 0) { - throw IllegalArgumentException("Rollup search must have size explicitly set to 0, but found ${request.source().size()}") + val isDataStream = (request.indices().any { IndexUtils.isDataStream(it, clusterService.state()) }) + val shardRequestIndex = request.shardId().indexName + // isRollupIndex throws an exception if the request is on a data stream + val isRollupIndex = if (isDataStream) false else isRollupIndex(shardRequestIndex, clusterService.state()) + val (containsRollup, rollupJob) = if (isDataStream) Pair(false, null) else originalSearchContainsRollup(request) + // Only modifies rollup searches and avoids internal client calls + if (containsRollup || isRollupIndex) { + val (concreteRollupIndicesArray, concreteLiveIndicesArray) = getConcreteIndices(request) + /* Avoid infinite interceptor loop: + if there is an internal client call made in the response interceptor there is only 1 index. + Therefore, conditions are not met for api to combine rollup and live data + */ + val isMultiSearch = (concreteRollupIndicesArray.isNotEmpty() && concreteLiveIndicesArray.isNotEmpty()) + if (isMultiSearch && request.source().aggregations() != null && !isRequestRewrittenIntoBuckets(request)) { + // Break apart request to remove overlapping parts + breakIntoBuckets(request, rollupJob!!) } - - val indices = request.indices().map { it.toString() }.toTypedArray() - val concreteIndices = indexNameExpressionResolver - .concreteIndexNames(clusterService.state(), request.indicesOptions(), *indices) - // To extract fields from QueryStringQueryBuilder we need concrete source index name. - val rollupJob = clusterService.state().metadata.index(index).getRollupJobs()?.get(0) - ?: throw IllegalArgumentException("No rollup job associated with target_index") - val queryFieldMappings = getQueryMetadata( - request.source().query(), - getConcreteSourceIndex(rollupJob.sourceIndex, indexNameExpressionResolver, clusterService.state()) - ) - val aggregationFieldMappings = getAggregationMetadata(request.source().aggregations()?.aggregatorFactories) - val fieldMappings = queryFieldMappings + aggregationFieldMappings - - val allMatchingRollupJobs = validateIndicies(concreteIndices, fieldMappings) - - // only rebuild if there is necessity to rebuild - if (fieldMappings.isNotEmpty()) { - rewriteShardSearchForRollupJobs(request, allMatchingRollupJobs) + // Rewrite the request to fit rollup format if not already done previously + if (isRollupIndex && !isRequestRollupFormat(request)) { + /* Client calls from the response interceptor require request bodies of 1, + otherwise do not allow size > 0 for rollup indices + */ + if (!allowRequest(request) && request.source().size() != 0) { + throw IllegalArgumentException( + "Rollup search must have size explicitly set to 0, " + + "but found ${request.source().size()}" + ) + } + rewriteRollupRequest(request, rollupJob!!, concreteRollupIndicesArray) } } } @@ -115,6 +261,48 @@ class RollupInterceptor( } } + /** + * @return Pair (concreteRollupIndices: Array, concreteLiveIndicesArray: Array) + */ + @Suppress("SpreadOperator") + fun getConcreteIndices(request: ShardSearchRequest): Pair, Array> { + val indices = request.indices().map { it.toString() }.toTypedArray() + val concreteIndices = indexNameExpressionResolver + .concreteIndexNames(clusterService.state(), request.indicesOptions(), *indices) + val concreteRollupIndexNames = mutableListOf() + val concreteLiveIndexNames = mutableListOf() + for (indexName in concreteIndices) { + if (isRollupIndex(indexName, clusterService.state())) { + concreteRollupIndexNames.add(indexName) + } else { + concreteLiveIndexNames.add(indexName) + } + } + val concreteRollupIndicesArray = concreteRollupIndexNames.toTypedArray() + val concreteLiveIndicesArray = concreteLiveIndexNames.toTypedArray() + return Pair(concreteRollupIndicesArray, concreteLiveIndicesArray) + } + + /** + * Modifies ShardSearchRequest to fit rollup index format + */ + fun rewriteRollupRequest(request: ShardSearchRequest, rollupJob: Rollup, concreteRollupIndicesArray: Array) { + // To extract fields from QueryStringQueryBuilder we need concrete source index name. + val queryFieldMappings = getQueryMetadata( + request.source().query(), + getConcreteSourceIndex(rollupJob.sourceIndex, indexNameExpressionResolver, clusterService.state()) + ) + val aggregationFieldMappings = getAggregationMetadata(request.source().aggregations()?.aggregatorFactories) + val fieldMappings = queryFieldMappings + aggregationFieldMappings + + val allMatchingRollupJobs = validateIndicies(concreteRollupIndicesArray, fieldMappings) + + // only rebuild if there is necessity to rebuild + if (fieldMappings.isNotEmpty()) { + rewriteShardSearchForRollupJobs(request, allMatchingRollupJobs) + } + } + fun getConcreteSourceIndex(sourceIndex: String, resolver: IndexNameExpressionResolver, clusterState: ClusterState): String { val concreteIndexNames = resolver.concreteIndexNames(clusterState, IndicesOptions.LENIENT_EXPAND_OPEN, sourceIndex) if (concreteIndexNames.isEmpty()) { @@ -142,7 +330,7 @@ class RollupInterceptor( var allMatchingRollupJobs: Map> = mapOf() for (concreteIndex in concreteIndices) { val rollupJobs = clusterService.state().metadata.index(concreteIndex).getRollupJobs() - ?: throw IllegalArgumentException("Not all indices have rollup job") + ?: throw IllegalArgumentException("Not all indices have rollup job, missing on $concreteIndex") val (matchingRollupJobs, issues) = findMatchingRollupJobs(fieldMappings, rollupJobs) if (issues.isNotEmpty() || matchingRollupJobs.isEmpty()) { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtils.kt index f6d749fbb..86e7342be 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtils.kt @@ -62,6 +62,9 @@ import org.opensearch.search.aggregations.metrics.ScriptedMetricAggregationBuild import org.opensearch.search.aggregations.metrics.SumAggregationBuilder import org.opensearch.search.aggregations.metrics.ValueCountAggregationBuilder import org.opensearch.search.builder.SearchSourceBuilder +import java.time.LocalDateTime +import java.time.ZoneOffset +import java.time.ZonedDateTime const val DATE_FIELD_STRICT_DATE_OPTIONAL_TIME_FORMAT = "strict_date_optional_time" const val DATE_FIELD_EPOCH_MILLIS_FORMAT = "epoch_millis" @@ -464,3 +467,83 @@ fun parseRollup(response: GetResponse, xContentRegistry: NamedXContentRegistry = return xcp.parseWithType(response.id, response.seqNo, response.primaryTerm, Rollup.Companion::parse) } +// Returns a SearchSourceBuilder with different aggregations but the rest of the properties are the same +@Suppress("ComplexMethod") +fun SearchSourceBuilder.changeAggregations(aggregationBuilderCollection: Collection): SearchSourceBuilder { + val ssb = SearchSourceBuilder() + aggregationBuilderCollection.forEach { ssb.aggregation(it) } + if (this.explain() != null) ssb.explain(this.explain()) + if (this.ext() != null) ssb.ext(this.ext()) + ssb.fetchSource(this.fetchSource()) + this.docValueFields()?.forEach { ssb.docValueField(it.field, it.format) } + ssb.storedFields(this.storedFields()) + if (this.from() >= 0) ssb.from(this.from()) + ssb.highlighter(this.highlighter()) + this.indexBoosts()?.forEach { ssb.indexBoost(it.index, it.boost) } + if (this.minScore() != null) ssb.minScore(this.minScore()) + if (this.postFilter() != null) ssb.postFilter(this.postFilter()) + ssb.profile(this.profile()) + if (this.query() != null) ssb.query(this.query()) + this.rescores()?.forEach { ssb.addRescorer(it) } + this.scriptFields()?.forEach { ssb.scriptField(it.fieldName(), it.script(), it.ignoreFailure()) } + if (this.searchAfter() != null) ssb.searchAfter(this.searchAfter()) + if (this.slice() != null) ssb.slice(this.slice()) + if (this.size() >= 0) ssb.size(this.size()) + this.sorts()?.forEach { ssb.sort(it) } + if (this.stats() != null) ssb.stats(this.stats()) + if (this.suggest() != null) ssb.suggest(this.suggest()) + if (this.terminateAfter() >= 0) ssb.terminateAfter(this.terminateAfter()) + if (this.timeout() != null) ssb.timeout(this.timeout()) + ssb.trackScores(this.trackScores()) + this.trackTotalHitsUpTo()?.let { ssb.trackTotalHitsUpTo(it) } + if (this.version() != null) ssb.version(this.version()) + if (this.seqNoAndPrimaryTerm() != null) ssb.seqNoAndPrimaryTerm(this.seqNoAndPrimaryTerm()) + if (this.collapse() != null) ssb.collapse(this.collapse()) + return ssb +} +@Suppress("MagicNumber") +fun convertDateStringToEpochMillis(dateString: String): Long { + val parts = dateString.split(" ") + require(parts.size == 2) { "Date in was not correct format" } + val dateParts = parts[0].split("-") + val timeParts = parts[1].split(":") + + require((dateParts.size == 3 && timeParts.size == 3)) { "Date in was not correct format" } + val year = dateParts[0].toInt() + val month = dateParts[1].toInt() + val day = dateParts[2].toInt() + + val hour = timeParts[0].toInt() + val minute = timeParts[1].toInt() + val second = timeParts[2].toInt() + + val localDateTime = LocalDateTime.of(year, month, day, hour, minute, second) + val instant = localDateTime.toInstant(ZoneOffset.UTC) + return instant.toEpochMilli() +} +@Suppress("MagicNumber") +fun convertFixedIntervalStringToMs(fixedInterval: String): Long { + // Possible types are ms, s, m, h, d + val regex = """(\d+)([a-zA-Z]+)""".toRegex() + val matchResult = regex.find(fixedInterval) + ?: throw IllegalArgumentException("Invalid interval format: $fixedInterval") + + val numericValue = matchResult.groupValues[1].toLong() + val intervalType = matchResult.groupValues[2] + + val milliseconds = when (intervalType) { + "ms" -> numericValue + "s" -> numericValue * 1000L + "m" -> numericValue * 60 * 1000L + "h" -> numericValue * 60 * 60 * 1000L + "d" -> numericValue * 24 * 60 * 60 * 1000L + "w" -> numericValue * 7 * 24 * 60 * 60 * 1000L + else -> throw IllegalArgumentException("Unsupported interval type: $intervalType") + } + + return milliseconds +} + +fun zonedDateTimeToMillis(zonedDateTime: ZonedDateTime): Long { + return zonedDateTime.toInstant().toEpochMilli() +} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/ResponseInterceptorIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/ResponseInterceptorIT.kt new file mode 100644 index 000000000..cda9f8dfb --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/ResponseInterceptorIT.kt @@ -0,0 +1,753 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.rollup.interceptor + +import org.apache.http.entity.ContentType +import org.apache.http.entity.StringEntity +import org.opensearch.indexmanagement.common.model.dimension.DateHistogram +import org.opensearch.indexmanagement.common.model.dimension.Terms +import org.opensearch.indexmanagement.makeRequest +import org.opensearch.indexmanagement.rollup.RollupRestTestCase +import org.opensearch.indexmanagement.rollup.model.Rollup +import org.opensearch.indexmanagement.rollup.model.RollupMetadata +import org.opensearch.indexmanagement.rollup.model.RollupMetrics +import org.opensearch.indexmanagement.rollup.model.metric.Average +import org.opensearch.indexmanagement.rollup.model.metric.Max +import org.opensearch.indexmanagement.rollup.model.metric.Min +import org.opensearch.indexmanagement.rollup.model.metric.Sum +import org.opensearch.indexmanagement.rollup.model.metric.ValueCount +import org.opensearch.indexmanagement.waitFor +import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule +import org.opensearch.rest.RestStatus +import java.time.Instant +import java.time.temporal.ChronoUnit + +@Suppress("UNCHECKED_CAST") +class ResponseInterceptorIT : RollupRestTestCase() { + fun `test search a live index and rollup index with no overlap`() { + generateNYCTaxiData("source_rollup_search") + val rollup = Rollup( + id = "base_case1_rollup_search", + enabled = true, + schemaVersion = 1L, + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), + jobLastUpdatedTime = Instant.now(), + jobEnabledTime = Instant.now(), + description = "basic search test", + sourceIndex = "source_rollup_search", + targetIndex = "target_rollup_search", + metadataID = null, + roles = emptyList(), + pageSize = 10, + delay = 0, + continuous = false, + dimensions = listOf( + DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1h"), + Terms("passenger_count", "passenger_count") + ), + metrics = listOf( + RollupMetrics( + sourceField = "passenger_count", targetField = "passenger_count", + metrics = listOf( + Sum(), Min(), Max(), + ValueCount(), Average() + ) + ) + ) + ).let { createRollup(it, it.id) } + + updateRollupStartTime(rollup) + + waitFor { + val rollupJob = getRollup(rollupId = rollup.id) + assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) + val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) + assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status) + } + // Get expected aggregation values by searching live data before deletion + var aggReq = """ + { + "size": 0, + "query": { + "match_all": {} + }, + "aggs": { + "sum_passenger_count": { + "sum": { + "field": "passenger_count" + } + }, + "max_passenger_count": { + "max": { + "field": "passenger_count" + } + }, + "min_passenger_count": { + "min": { + "field": "passenger_count" + } + }, + "avg_passenger_count": { + "avg": { + "field": "passenger_count" + } + }, + "count_passenger_count": { + "value_count": { + "field": "passenger_count" + } + } + } + } + """.trimIndent() + var searchResponse = client().makeRequest("POST", "/source_rollup_search/_search", emptyMap(), StringEntity(aggReq, ContentType.APPLICATION_JSON)) + assertTrue("Could not search inital data for expected values", searchResponse.restStatus() == RestStatus.OK) + var expectedAggs = searchResponse.asMap()["aggregations"] as Map> + val expectedSum = expectedAggs.getValue("sum_passenger_count")["value"] + val expectedMax = expectedAggs.getValue("max_passenger_count")["value"] + val expectedMin = expectedAggs.getValue("min_passenger_count")["value"] + val expectedCount = expectedAggs.getValue("count_passenger_count")["value"] + val expectedAvg = expectedAggs.getValue("avg_passenger_count")["value"] + refreshAllIndices() + // Split data at 1546304400000 or Jan 01 2019 01:00:00 + // Delete half the values from live data simulating an ism job deleting old data + var r = """ + { + "query": { + "range": { + "tpep_pickup_datetime": { + "lt": 1546304400000, + "format": "epoch_millis", + "time_zone": "+00:00" + } + } + } + } + """.trimIndent() + var deleteLiveResponse = client().makeRequest( + "POST", + "source_rollup_search/_delete_by_query", + mapOf("refresh" to "true"), + StringEntity(r, ContentType.APPLICATION_JSON) + ) + + assertTrue("Could not delete live data", deleteLiveResponse.restStatus() == RestStatus.OK) + + // Delete half the values from rollup data + r = """ + { + "query": { + "range": { + "tpep_pickup_datetime": { + "gte": 1546304400000, + "format": "epoch_millis", + "time_zone": "+00:00" + } + } + } + } + """.trimIndent() + var deleteRollupResponse = client().makeRequest( + "POST", + "target_rollup_search/_delete_by_query", + mapOf("refresh" to "true"), + StringEntity(r, ContentType.APPLICATION_JSON) + ) + + assertTrue("Could not delete rollup data", deleteRollupResponse.restStatus() == RestStatus.OK) + var searchBothResponse = client().makeRequest("POST", "/target_rollup_search,source_rollup_search/_search", emptyMap(), StringEntity(aggReq, ContentType.APPLICATION_JSON)) + assertTrue(searchBothResponse.restStatus() == RestStatus.OK) + var responseAggs = searchBothResponse.asMap()["aggregations"] as Map> + assertEquals( + "sum agg is wrong", + expectedSum, + responseAggs.getValue("sum_passenger_count")["value"] + ) + assertEquals( + "max agg is wrong", + expectedMax, + responseAggs.getValue("max_passenger_count")["value"] + ) + assertEquals( + "min agg is wrong", + expectedMin, + responseAggs.getValue("min_passenger_count")["value"] + ) + assertEquals( + "value_count is wrong", + expectedCount, + responseAggs.getValue("count_passenger_count")["value"] + ) + assertEquals( + "avg is wrong", + expectedAvg, + responseAggs.getValue("avg_passenger_count")["value"] + ) + } +// // Edge Case + fun `test search a live index with no data and rollup index with data`() { + generateNYCTaxiData("source_rollup_search_no_data_case") + val rollup = Rollup( + id = "base_case_2_rollup_search", + enabled = true, + schemaVersion = 1L, + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), + jobLastUpdatedTime = Instant.now(), + jobEnabledTime = Instant.now(), + description = "basic search test", + sourceIndex = "source_rollup_search_no_data_case", + targetIndex = "target_rollup_search_no_data_case", + metadataID = null, + roles = emptyList(), + pageSize = 10, + delay = 0, + continuous = false, + dimensions = listOf( + DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1h"), + Terms("passenger_count", "passenger_count") + ), + metrics = listOf( + RollupMetrics( + sourceField = "passenger_count", targetField = "passenger_count", + metrics = listOf( + Sum(), Min(), Max(), + ValueCount(), Average() + ) + ) + ) + ).let { createRollup(it, it.id) } + + updateRollupStartTime(rollup) + + waitFor { + val rollupJob = getRollup(rollupId = rollup.id) + assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) + val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) + assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status) + } + + refreshAllIndices() + // Get expected aggregation values by searching live data before deletion + var aggReq = """ + { + "size": 0, + "query": { + "match_all": {} + }, + "aggs": { + "sum_passenger_count": { + "sum": { + "field": "passenger_count" + } + }, + "max_passenger_count": { + "max": { + "field": "passenger_count" + } + }, + "min_passenger_count": { + "min": { + "field": "passenger_count" + } + }, + "avg_passenger_count": { + "avg": { + "field": "passenger_count" + } + }, + "count_passenger_count": { + "value_count": { + "field": "passenger_count" + } + } + } + } + """.trimIndent() + var searchResponse = client().makeRequest("POST", "/source_rollup_search_no_data_case/_search", emptyMap(), StringEntity(aggReq, ContentType.APPLICATION_JSON)) + assertTrue("Could not search inital data for expected values", searchResponse.restStatus() == RestStatus.OK) + var expectedAggs = searchResponse.asMap()["aggregations"] as Map> + val expectedSum = expectedAggs.getValue("sum_passenger_count")["value"] + val expectedMax = expectedAggs.getValue("max_passenger_count")["value"] + val expectedMin = expectedAggs.getValue("min_passenger_count")["value"] + val expectedCount = expectedAggs.getValue("count_passenger_count")["value"] + val expectedAvg = expectedAggs.getValue("avg_passenger_count")["value"] + refreshAllIndices() + // Delete values from live index + var deleteResponse = client().makeRequest( + "POST", + "source_rollup_search_no_data_case/_delete_by_query", + mapOf("refresh" to "true"), + StringEntity("""{"query": {"match_all": {}}}""", ContentType.APPLICATION_JSON) + ) + assertTrue(deleteResponse.restStatus() == RestStatus.OK) + var searchBothResponse = client().makeRequest("POST", "/target_rollup_search_no_data_case,source_rollup_search_no_data_case/_search", emptyMap(), StringEntity(aggReq, ContentType.APPLICATION_JSON)) + assertTrue(searchBothResponse.restStatus() == RestStatus.OK) + var responseAggs = searchBothResponse.asMap()["aggregations"] as Map> + assertEquals( + "sum agg is wrong", + expectedSum, + responseAggs.getValue("sum_passenger_count")["value"] + ) + assertEquals( + "max agg is wrong", + expectedMax, + responseAggs.getValue("max_passenger_count")["value"] + ) + assertEquals( + "min agg is wrong", + expectedMin, + responseAggs.getValue("min_passenger_count")["value"] + ) + assertEquals( + "value_count is wrong", + expectedCount, + responseAggs.getValue("count_passenger_count")["value"] + ) + assertEquals( + "avg is wrong", + expectedAvg, + responseAggs.getValue("avg_passenger_count")["value"] + ) + } + fun `test search a live index and rollup index with data overlap`() { + generateNYCTaxiData("source_rollup_search_data_overlap_case") + val rollup = Rollup( + id = "case2_rollup_search", + enabled = true, + schemaVersion = 1L, + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), + jobLastUpdatedTime = Instant.now(), + jobEnabledTime = Instant.now(), + description = "basic search test", + sourceIndex = "source_rollup_search_data_overlap_case", + targetIndex = "target_rollup_search_data_overlap_case", + metadataID = null, + roles = emptyList(), + pageSize = 10, + delay = 0, + continuous = false, + dimensions = listOf( + DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1h"), + Terms("passenger_count", "passenger_count") + ), + metrics = listOf( + RollupMetrics( + sourceField = "passenger_count", targetField = "passenger_count", + metrics = listOf( + Sum(), Min(), Max(), + ValueCount(), Average() + ) + ) + ) + ).let { createRollup(it, it.id) } + + updateRollupStartTime(rollup) + + waitFor { + val rollupJob = getRollup(rollupId = rollup.id) + assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) + val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) + assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status) + } + // Get expected aggregation values by searching live data before deletion + var aggReq = """ + { + "size": 0, + "query": { + "match_all": {} + }, + "aggs": { + "sum_passenger_count": { + "sum": { + "field": "passenger_count" + } + }, + "max_passenger_count": { + "max": { + "field": "passenger_count" + } + }, + "min_passenger_count": { + "min": { + "field": "passenger_count" + } + }, + "avg_passenger_count": { + "avg": { + "field": "passenger_count" + } + }, + "count_passenger_count": { + "value_count": { + "field": "passenger_count" + } + } + } + } + """.trimIndent() + var expectedSearchResponse = client().makeRequest("POST", "/source_rollup_search_data_overlap_case/_search", emptyMap(), StringEntity(aggReq, ContentType.APPLICATION_JSON)) + assertTrue("Could not search inital data for expected values", expectedSearchResponse.restStatus() == RestStatus.OK) + var expectedAggs = expectedSearchResponse.asMap()["aggregations"] as Map> + val expectedSum = expectedAggs.getValue("sum_passenger_count")["value"] + val expectedMax = expectedAggs.getValue("max_passenger_count")["value"] + val expectedMin = expectedAggs.getValue("min_passenger_count")["value"] + val expectedCount = expectedAggs.getValue("count_passenger_count")["value"] + val expectedAvg = expectedAggs.getValue("avg_passenger_count")["value"] + + refreshAllIndices() + // Split data at 1546304400000 or Jan 01 2019 01:00:00 + // Delete half the values from live data simulating an ism job deleting old data + var r = """ + { + "query": { + "range": { + "tpep_pickup_datetime": { + "lt": 1546304400000, + "format": "epoch_millis", + "time_zone": "+00:00" + } + } + } + } + """.trimIndent() + var deleteLiveResponse = client().makeRequest( + "POST", + "source_rollup_search_data_overlap_case/_delete_by_query", + mapOf("refresh" to "true"), + StringEntity(r, ContentType.APPLICATION_JSON) + ) + + assertTrue("Could not delete live data", deleteLiveResponse.restStatus() == RestStatus.OK) + // Rollup index is complete overlap of live data + // Search both and check if time series data is the same + var searchBothResponse = client().makeRequest("POST", "/target_rollup_search_data_overlap_case,source_rollup_search_data_overlap_case/_search", emptyMap(), StringEntity(aggReq, ContentType.APPLICATION_JSON)) + assertTrue(searchBothResponse.restStatus() == RestStatus.OK) + var responseAggs = searchBothResponse.asMap()["aggregations"] as Map> + assertEquals( + "sum agg is wrong", + expectedSum, + responseAggs.getValue("sum_passenger_count")["value"] + ) + assertEquals( + "max agg is wrong", + expectedMax, + responseAggs.getValue("max_passenger_count")["value"] + ) + assertEquals( + "min agg is wrong", + expectedMin, + responseAggs.getValue("min_passenger_count")["value"] + ) + assertEquals( + "value_count is wrong", + expectedCount, + responseAggs.getValue("count_passenger_count")["value"] + ) + assertEquals( + "avg is wrong", + expectedAvg, + responseAggs.getValue("avg_passenger_count")["value"] + ) + } + // Breaking subsequent tests but works when executed one its own + fun `test search multiple live data indices and a rollup data index with overlap`() { + generateNYCTaxiData("source_rollup_search_multi_index_case") + val rollup = Rollup( + id = "case3_rollup_search", + enabled = true, + schemaVersion = 1L, + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), + jobLastUpdatedTime = Instant.now(), + jobEnabledTime = Instant.now(), + description = "basic search test", + sourceIndex = "source_rollup_search_multi_index_case", + targetIndex = "target_rollup_search_multi_index_case", + metadataID = null, + roles = emptyList(), + pageSize = 10, + delay = 0, + continuous = false, + dimensions = listOf( + DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1h"), + Terms("passenger_count", "passenger_count") + ), + metrics = listOf( + RollupMetrics( + sourceField = "passenger_count", targetField = "passenger_count", + metrics = listOf( + Sum(), Min(), Max(), + ValueCount(), Average() + ) + ) + ) + ).let { createRollup(it, it.id) } + + updateRollupStartTime(rollup) + + waitFor { + val rollupJob = getRollup(rollupId = rollup.id) + assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) + val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) + assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status) + } + + refreshAllIndices() + // Split data at 1546304400000 or Jan 01 2019 01:00:00 + // Delete half the values from live data simulating an ism job deleting old data + var r = """ + { + "query": { + "range": { + "tpep_pickup_datetime": { + "lt": 1546304400000, + "format": "epoch_millis", + "time_zone": "+00:00" + } + } + } + } + """.trimIndent() + var deleteLiveResponse = client().makeRequest( + "POST", + "source_rollup_search_multi_index_case/_delete_by_query", + mapOf("refresh" to "true"), + StringEntity(r, ContentType.APPLICATION_JSON) + ) + + assertTrue("Could not delete live data", deleteLiveResponse.restStatus() == RestStatus.OK) + + // Insert more live data + generateNYCTaxiData("source_rollup_search_multi_index_case2") + // Expected values would discard the overlapping rollup index completely + var aggReq = """ + { + "size": 0, + "query": { + "match_all": {} + }, + "aggs": { + "sum_passenger_count": { + "sum": { + "field": "passenger_count" + } + }, + "max_passenger_count": { + "max": { + "field": "passenger_count" + } + }, + "min_passenger_count": { + "min": { + "field": "passenger_count" + } + }, + "avg_passenger_count": { + "avg": { + "field": "passenger_count" + } + }, + "count_passenger_count": { + "value_count": { + "field": "passenger_count" + } + } + } + } + """.trimIndent() + var searchResponse = client().makeRequest("POST", "/source_rollup_search_multi_index_case,source_rollup_search_multi_index_case2/_search", emptyMap(), StringEntity(aggReq, ContentType.APPLICATION_JSON)) + assertTrue("Could not search initial data for expected values", searchResponse.restStatus() == RestStatus.OK) + var expectedAggs = searchResponse.asMap()["aggregations"] as Map> + val expectedSum = expectedAggs.getValue("sum_passenger_count")["value"] + val expectedMax = expectedAggs.getValue("max_passenger_count")["value"] + val expectedMin = expectedAggs.getValue("min_passenger_count")["value"] + val expectedCount = expectedAggs.getValue("count_passenger_count")["value"] + val expectedAvg = expectedAggs.getValue("avg_passenger_count")["value"] + refreshAllIndices() + + // Search all 3 indices to check if overlap was removed + var searchAllResponse = client().makeRequest("POST", "/target_rollup_search_multi_index_case,source_rollup_search_multi_index_case,source_rollup_search_multi_index_case2/_search", emptyMap(), StringEntity(aggReq, ContentType.APPLICATION_JSON)) + assertTrue(searchAllResponse.restStatus() == RestStatus.OK) + var responseAggs = searchAllResponse.asMap()["aggregations"] as Map> + assertEquals( + "sum agg is wrong", + expectedSum, + responseAggs.getValue("sum_passenger_count")["value"] + ) + assertEquals( + "max agg is wrong", + expectedMax, + responseAggs.getValue("max_passenger_count")["value"] + ) + assertEquals( + "min agg is wrong", + expectedMin, + responseAggs.getValue("min_passenger_count")["value"] + ) + assertEquals( + "value_count is wrong", + expectedCount, + responseAggs.getValue("count_passenger_count")["value"] + ) + assertEquals( + "avg is wrong", + expectedAvg, + responseAggs.getValue("avg_passenger_count")["value"] + ) + } + fun `test search aliased live indices data and rollup data`() { + /* add later */ + // Create 3 indices with 5,000 docs each nyc-taxi-data-1, nyc-taxi-data-2, nyc-taxi-data-3 + generateNYCTaxiData("nyc-taxi-data-1") + generateNYCTaxiData("nyc-taxi-data-2") + generateNYCTaxiData("nyc-taxi-data-3") + // Add them to alias nyc-taxi-data + val createAliasReq = """ + { + "actions": [ + { + "add": { + "index": "nyc-taxi-data-1", + "alias": "nyc-taxi-data" + } + }, + { + "add": { + "index": "nyc-taxi-data-2", + "alias": "nyc-taxi-data" + } + }, + { + "add": { + "index": "nyc-taxi-data-3", + "alias": "nyc-taxi-data" + } + } + ] + } + """.trimIndent() + val createAliasRes = client().makeRequest( + "POST", + "_aliases", + mapOf(), + StringEntity(createAliasReq, ContentType.APPLICATION_JSON) + ) + assertTrue("Could not create alias", createAliasRes.restStatus() == RestStatus.OK) + // Rollup alias into rollup-nyc-taxi-data + val rollup = Rollup( + id = "alias_rollup_search", + enabled = true, + schemaVersion = 1L, + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), + jobLastUpdatedTime = Instant.now(), + jobEnabledTime = Instant.now(), + description = "basic search test", + sourceIndex = "nyc-taxi-data", + targetIndex = "rollup-nyc-taxi-data", + metadataID = null, + roles = emptyList(), + pageSize = 10, + delay = 0, + continuous = false, + dimensions = listOf( + DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1h"), + Terms("passenger_count", "passenger_count") + ), + metrics = listOf( + RollupMetrics( + sourceField = "passenger_count", targetField = "passenger_count", + metrics = listOf( + Sum(), Min(), Max(), + ValueCount(), Average() + ) + ) + ) + ).let { createRollup(it, it.id) } + + updateRollupStartTime(rollup) + + waitFor { + val rollupJob = getRollup(rollupId = rollup.id) + assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) + val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) + assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status) + } + refreshAllIndices() + // Find expected values by searching nyc-taxi-data + var aggReq = """ + { + "size": 0, + "query": { + "match_all": {} + }, + "aggs": { + "sum_passenger_count": { + "sum": { + "field": "passenger_count" + } + }, + "max_passenger_count": { + "max": { + "field": "passenger_count" + } + }, + "min_passenger_count": { + "min": { + "field": "passenger_count" + } + }, + "avg_passenger_count": { + "avg": { + "field": "passenger_count" + } + }, + "count_passenger_count": { + "value_count": { + "field": "passenger_count" + } + } + } + } + """.trimIndent() + var searchResponse = client().makeRequest("POST", "/nyc-taxi-data/_search", emptyMap(), StringEntity(aggReq, ContentType.APPLICATION_JSON)) + assertTrue("Could not search initial data for expected values", searchResponse.restStatus() == RestStatus.OK) + var expectedAggs = searchResponse.asMap()["aggregations"] as Map> + val expectedSum = expectedAggs.getValue("sum_passenger_count")["value"] + val expectedMax = expectedAggs.getValue("max_passenger_count")["value"] + val expectedMin = expectedAggs.getValue("min_passenger_count")["value"] + val expectedCount = expectedAggs.getValue("count_passenger_count")["value"] + val expectedAvg = expectedAggs.getValue("avg_passenger_count")["value"] + refreshAllIndices() + // Validate result from searching rollup-nyc-taxi-data, searching nyc-taxi-data + var searchAllResponse = client().makeRequest("POST", "/rollup-nyc-taxi-data,nyc-taxi-data/_search", emptyMap(), StringEntity(aggReq, ContentType.APPLICATION_JSON)) + assertTrue(searchAllResponse.restStatus() == RestStatus.OK) + var responseAggs = searchAllResponse.asMap()["aggregations"] as Map> + assertEquals( + "sum agg is wrong", + expectedSum, + responseAggs.getValue("sum_passenger_count")["value"] + ) + assertEquals( + "max agg is wrong", + expectedMax, + responseAggs.getValue("max_passenger_count")["value"] + ) + assertEquals( + "min agg is wrong", + expectedMin, + responseAggs.getValue("min_passenger_count")["value"] + ) + assertEquals( + "value_count is wrong", + expectedCount, + responseAggs.getValue("count_passenger_count")["value"] + ) + assertEquals( + "avg is wrong", + expectedAvg, + responseAggs.getValue("avg_passenger_count")["value"] + ) + } +} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt index e42295418..105cc20c6 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt @@ -1045,21 +1045,6 @@ class RollupInterceptorIT : RollupRestTestCase() { } } """.trimIndent() - // Search 1 non-rollup index and 1 rollup - val searchResult1 = client().makeRequest("POST", "/$sourceIndex2,$targetIndex2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) - assertTrue(searchResult1.restStatus() == RestStatus.OK) - val failures = extractFailuresFromSearchResponse(searchResult1) - assertNotNull(failures) - assertEquals(1, failures?.size) - assertEquals( - "Searching multiple indices where one is rollup and other is not, didn't return failure", - "illegal_argument_exception", failures?.get(0)?.get("type") ?: "Didn't find failure type in search response" - - ) - assertEquals( - "Searching multiple indices where one is rollup and other is not, didn't return failure", - "Not all indices have rollup job", failures?.get(0)?.get("reason") ?: "Didn't find failure reason in search response" - ) // Search 2 rollups with different mappings try { diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestDeleteRollupActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestDeleteRollupActionIT.kt index 43fc6dca5..cfef1f7bc 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestDeleteRollupActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestDeleteRollupActionIT.kt @@ -37,7 +37,6 @@ class RestDeleteRollupActionIT : RollupRestTestCase() { assertEquals(RestStatus.NOT_FOUND, e.response.restStatus()) } } - @Throws(Exception::class) fun `test deleting a rollup that doesn't exist and config index doesnt exist`() { try { diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtilsTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtilsTests.kt index c063a3487..fdb5cece6 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtilsTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtilsTests.kt @@ -31,8 +31,13 @@ import org.opensearch.indexmanagement.rollup.randomValueCount import org.opensearch.indexmanagement.transform.randomAggregationBuilder import org.opensearch.search.aggregations.metrics.AvgAggregationBuilder import org.opensearch.search.aggregations.metrics.ValueCountAggregationBuilder +import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.test.OpenSearchTestCase import org.opensearch.test.rest.OpenSearchRestTestCase +import java.time.ZoneId +import java.time.ZonedDateTime +import kotlin.test.assertFails +import kotlin.test.assertFailsWith class RollupUtilsTests : OpenSearchTestCase() { @@ -230,4 +235,60 @@ class RollupUtilsTests : OpenSearchTestCase() { assertEquals("Rewritten aggregation builder is not the correct type", aggBuilder.type, rewrittenAgg.type) } } + fun `test changeAggregations`() { + val ssb = SearchSourceBuilder() + val oldAggBuilder = randomAggregationBuilder() + ssb.aggregation(oldAggBuilder) + var newAgg = randomAggregationBuilder() + while (newAgg == oldAggBuilder) newAgg = randomAggregationBuilder() + val newSsb = ssb.changeAggregations(listOf(newAgg)) + assertNotEquals("Did not change search source builders aggregations :(", newSsb, ssb) + } + fun `test convertDateStringToEpochMillis`() { + // Check correct time format + val dateString = "2023-07-18 12:30:00" + val expectedMillis = 1689683400000L + val actualMillis = convertDateStringToEpochMillis(dateString) + assertEquals(expectedMillis, actualMillis) + + // Testing an invalid date format throws error + assertFails { convertDateStringToEpochMillis("invalid format") } + } + fun `test convertFixedIntervalStringToMs`() { + // Test ms + val msString = "5ms" + val expectedMs = 5L + assertEquals("ms conversion is wrong", convertFixedIntervalStringToMs(msString), expectedMs) + // Test s + val sString = "5s" + val expectedS = 5000L + assertEquals("ms conversion is wrong", convertFixedIntervalStringToMs(sString), expectedS) + // Test m + val mString = "3m" + val expectedM = 180000L + assertEquals("m conversion is wrong", convertFixedIntervalStringToMs(mString), expectedM) + // Test h + val hString = "2h" + val expectedH = 7200000L + assertEquals("h conversion is wrong", convertFixedIntervalStringToMs(hString), expectedH) + // Test d + val dString = "1d" + val expectedD = 86400000L + assertEquals("d conversion is wrong", convertFixedIntervalStringToMs(dString), expectedD) + // Test w + val wString = "1w" + val expectedW = 604800000L + assertEquals("w conversion is wrong", convertFixedIntervalStringToMs(wString), expectedW) + // Test error + val invalid = ";)" + assertFailsWith { + convertFixedIntervalStringToMs(invalid) + } + } + fun `test zonedDateTimeToMillis`() { + val zonedDateTime = ZonedDateTime.of(2023, 7, 18, 12, 30, 0, 0, ZoneId.of("UTC")) + val expectedMillis = 1689683400000L // ms since epoch of the zonedDateTime + val actualMillis = zonedDateTimeToMillis(zonedDateTime) + assertEquals(expectedMillis, actualMillis) + } }