diff --git a/.github/workflows/multi-node-test-workflow.yml b/.github/workflows/multi-node-test-workflow.yml index 9b548ee14..0bb068e78 100644 --- a/.github/workflows/multi-node-test-workflow.yml +++ b/.github/workflows/multi-node-test-workflow.yml @@ -22,7 +22,7 @@ jobs: - name: Checkout Branch uses: actions/checkout@v2 - name: Run integration tests with multi node config - run: ./gradlew integTest + run: ./gradlew integTest -PnumNodes=3 - name: Upload failed logs uses: actions/upload-artifact@v2 if: failure() diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/ResponseInterceptor.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/ResponseInterceptor.kt index 19b6bccdc..881210c95 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/ResponseInterceptor.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/ResponseInterceptor.kt @@ -53,6 +53,13 @@ import java.time.ZonedDateTime import kotlin.math.max import kotlin.math.min +/** +* The Response Interceptor class modifies resopnses if the search API is triggered on rollup and live data +* 1. The class check if the request was rewritten in the RollupInterceptor into more granular buckets +* 2. in findOverlap it checks for overlap between live data and rollup data and returns the interval to include +* 3. computeAggregationsWithoutOverlap() iterates through the buckets and recomputes the aggregations in the expected format +* 4. Returns a new response for each shard to be combined later + **/ class ResponseInterceptor( val clusterService: ClusterService, val settings: Settings, @@ -126,7 +133,9 @@ class ResponseInterceptor( return Pair(rollupIndices.toTypedArray(), liveIndices.toTypedArray()) } - // Calculated the end time for the current shard index if it is a rollup index with data overlapp + /** + * Calculates the end time for the current shard index if it is a rollup index with data overlapp + **/ @Suppress("SpreadOperator") suspend fun getRollupEndTime(liveDataStartPoint: Long, rollupIndices: Array, dateTargetField: String): Long { // Build search request to find the maximum rollup timestamp <= liveDataStartPoint @@ -145,16 +154,19 @@ class ResponseInterceptor( try { return res!!.hits.hits[0].sourceAsMap.get("$dateTargetField.date_histogram") as Long } catch (e: Exception) { - logger.error("Not able to retrieve intersection time from response: $e") + logger.error("Not able to retrieve intersection time from response: ", e) } return 0L // dummy :P } - -// Returns Pair(startRange: Long, endRange: Long) -// Note startRange is inclusive and endRange is exclusive, they are Longs because the type is epoch milliseconds + /** + * Returns Pair(startRange: Long, endRange: Long) + * Note startRange is inclusive and endRange is exclusive, they are Longs because the type is epoch milliseconds + **/ + // TODO intercept at the index level instead of the shard level to avoid redundant client calls for every index @Suppress("LongMethod", "SpreadOperator") suspend fun findOverlap(response: QuerySearchResult): Pair { - val job: Rollup = getRollupJob(response)!! // maybe throw a try catch later + // TODO add more error logs and try catch statements for client calls + val job: Rollup = getRollupJob(response)!! var dateSourceField: String = "" var dateTargetField: String = "" var rollupInterval: String? = "" @@ -181,7 +193,7 @@ class ResponseInterceptor( val maxRollupDateRequest = SearchRequest() .source(searchSourceBuilder) .indices(*rollupIndices) // add all rollup indices to this request - logger.info("Sending maxRollupDate request for $shardRequestIndex") + logger.debug("Sending maxRollupDate request for $shardRequestIndex") val maxRollupDateResponse: SearchResponse? = client.suspendUntil { search(maxRollupDateRequest, it) } // Build search request to find the minimum date in all live indices sort = SortBuilders.fieldSort(dateSourceField).order(SortOrder.ASC) @@ -200,7 +212,7 @@ class ResponseInterceptor( } else { // shard index is live index minLiveDateRequest.indices(shardRequestIndex) } - logger.info("Sending minLiveData request for $shardRequestIndex") + logger.debug("Sending minLiveData request for $shardRequestIndex") var minLiveDateResponse: SearchResponse? = client.suspendUntil { search(minLiveDateRequest, it) } val foundMinAndMax = (minLiveDateResponse != null && maxRollupDateResponse != null) // if they overlap find part to exclude @@ -213,7 +225,7 @@ class ResponseInterceptor( // If intersection found on rollup index, remove overlap if ((liveDataStartPoint < rollupDataEndPoint) && isShardIndexRollup) { // Start at 0, end at live data - logger.info("ronsax sending request to find rollup endtime for index: $shardRequestIndex") + logger.debug("Sending request to find rollup endtime for index: $shardRequestIndex") val endTime = getRollupEndTime(liveDataStartPoint, rollupIndices, dateTargetField) return Pair(0L, endTime) } @@ -222,7 +234,7 @@ class ResponseInterceptor( return Pair(0L, Long.MAX_VALUE) } - // Depending on which metric the aggregation is computer data differently + // Depending on which metric the aggregation is, computation is different @Suppress("ReturnCount") fun computeRunningValue(agg: org.opensearch.search.aggregations.Aggregation, currentValue: Any): Pair { when (agg) { @@ -289,7 +301,7 @@ class ResponseInterceptor( } } } else { // Value count calc - // Won't double count + // Put in set to avoid adding the aggregation twice addedAggregations += modifiedName val originalName = modifiedName.removeSuffix(".rollup.avg.value_count") val avgCount = value as Long @@ -305,7 +317,9 @@ class ResponseInterceptor( throw NullPointerException("Can't calculate avg agg for rollup index") } -// Returns a new InternalAggregations that contains merged aggregation(s) with the overlapping data removed + /** + * Returns a new InternalAggregations that contains merged aggregation(s) with the overlapping data removed + */ @Suppress("NestedBlockDepth") fun computeAggregationsWithoutOverlap(intervalAggregations: InternalAggregations, start: Long, end: Long): InternalAggregations { // Store the running values of the aggregations being computed @@ -333,7 +347,7 @@ class ResponseInterceptor( } } - // Create a new InternalAggregations with sum values + // Create a new InternalAggregations with recomputed values discarding the overlap val allAggregations = mutableListOf() val addedAggregations = mutableSetOf() // avoid repeating the same aggregations for ((aggName, data) in aggValues) {