Skip to content

Commit

Permalink
resolved comments on the PR
Browse files Browse the repository at this point in the history
Signed-off-by: Ronnak Saxena <[email protected]>
  • Loading branch information
ronnaksaxena committed Sep 13, 2023
1 parent 06b7b02 commit acf1c4b
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 14 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/multi-node-test-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
- name: Checkout Branch
uses: actions/checkout@v2
- name: Run integration tests with multi node config
run: ./gradlew integTest
run: ./gradlew integTest -PnumNodes=3
- name: Upload failed logs
uses: actions/upload-artifact@v2
if: failure()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ import java.time.ZonedDateTime
import kotlin.math.max
import kotlin.math.min

/**
* The Response Interceptor class modifies resopnses if the search API is triggered on rollup and live data
* 1. The class check if the request was rewritten in the RollupInterceptor into more granular buckets
* 2. in findOverlap it checks for overlap between live data and rollup data and returns the interval to include
* 3. computeAggregationsWithoutOverlap() iterates through the buckets and recomputes the aggregations in the expected format
* 4. Returns a new response for each shard to be combined later
**/
class ResponseInterceptor(
val clusterService: ClusterService,
val settings: Settings,
Expand Down Expand Up @@ -126,7 +133,9 @@ class ResponseInterceptor(
return Pair(rollupIndices.toTypedArray(), liveIndices.toTypedArray())
}

// Calculated the end time for the current shard index if it is a rollup index with data overlapp
/**
* Calculates the end time for the current shard index if it is a rollup index with data overlapp
**/
@Suppress("SpreadOperator")
suspend fun getRollupEndTime(liveDataStartPoint: Long, rollupIndices: Array<String>, dateTargetField: String): Long {
// Build search request to find the maximum rollup timestamp <= liveDataStartPoint
Expand All @@ -145,16 +154,19 @@ class ResponseInterceptor(
try {
return res!!.hits.hits[0].sourceAsMap.get("$dateTargetField.date_histogram") as Long
} catch (e: Exception) {
logger.error("Not able to retrieve intersection time from response: $e")
logger.error("Not able to retrieve intersection time from response: ", e)
}
return 0L // dummy :P
}

// Returns Pair(startRange: Long, endRange: Long)
// Note startRange is inclusive and endRange is exclusive, they are Longs because the type is epoch milliseconds
/**
* Returns Pair(startRange: Long, endRange: Long)
* Note startRange is inclusive and endRange is exclusive, they are Longs because the type is epoch milliseconds
**/
// TODO intercept at the index level instead of the shard level to avoid redundant client calls for every index
@Suppress("LongMethod", "SpreadOperator")
suspend fun findOverlap(response: QuerySearchResult): Pair<Long, Long> {
val job: Rollup = getRollupJob(response)!! // maybe throw a try catch later
// TODO add more error logs and try catch statements for client calls
val job: Rollup = getRollupJob(response)!!
var dateSourceField: String = ""
var dateTargetField: String = ""
var rollupInterval: String? = ""
Expand All @@ -181,7 +193,7 @@ class ResponseInterceptor(
val maxRollupDateRequest = SearchRequest()
.source(searchSourceBuilder)
.indices(*rollupIndices) // add all rollup indices to this request
logger.info("Sending maxRollupDate request for $shardRequestIndex")
logger.debug("Sending maxRollupDate request for $shardRequestIndex")
val maxRollupDateResponse: SearchResponse? = client.suspendUntil { search(maxRollupDateRequest, it) }
// Build search request to find the minimum date in all live indices
sort = SortBuilders.fieldSort(dateSourceField).order(SortOrder.ASC)
Expand All @@ -200,7 +212,7 @@ class ResponseInterceptor(
} else { // shard index is live index
minLiveDateRequest.indices(shardRequestIndex)
}
logger.info("Sending minLiveData request for $shardRequestIndex")
logger.debug("Sending minLiveData request for $shardRequestIndex")
var minLiveDateResponse: SearchResponse? = client.suspendUntil { search(minLiveDateRequest, it) }
val foundMinAndMax = (minLiveDateResponse != null && maxRollupDateResponse != null)
// if they overlap find part to exclude
Expand All @@ -213,7 +225,7 @@ class ResponseInterceptor(
// If intersection found on rollup index, remove overlap
if ((liveDataStartPoint < rollupDataEndPoint) && isShardIndexRollup) {
// Start at 0, end at live data
logger.info("ronsax sending request to find rollup endtime for index: $shardRequestIndex")
logger.debug("Sending request to find rollup endtime for index: $shardRequestIndex")
val endTime = getRollupEndTime(liveDataStartPoint, rollupIndices, dateTargetField)
return Pair(0L, endTime)
}
Expand All @@ -222,7 +234,7 @@ class ResponseInterceptor(
return Pair(0L, Long.MAX_VALUE)
}

// Depending on which metric the aggregation is computer data differently
// Depending on which metric the aggregation is, computation is different
@Suppress("ReturnCount")
fun computeRunningValue(agg: org.opensearch.search.aggregations.Aggregation, currentValue: Any): Pair<Any, String> {
when (agg) {
Expand Down Expand Up @@ -289,7 +301,7 @@ class ResponseInterceptor(
}
}
} else { // Value count calc
// Won't double count
// Put in set to avoid adding the aggregation twice
addedAggregations += modifiedName
val originalName = modifiedName.removeSuffix(".rollup.avg.value_count")
val avgCount = value as Long
Expand All @@ -305,7 +317,9 @@ class ResponseInterceptor(
throw NullPointerException("Can't calculate avg agg for rollup index")
}

// Returns a new InternalAggregations that contains merged aggregation(s) with the overlapping data removed
/**
* Returns a new InternalAggregations that contains merged aggregation(s) with the overlapping data removed
*/
@Suppress("NestedBlockDepth")
fun computeAggregationsWithoutOverlap(intervalAggregations: InternalAggregations, start: Long, end: Long): InternalAggregations {
// Store the running values of the aggregations being computed
Expand Down Expand Up @@ -333,7 +347,7 @@ class ResponseInterceptor(
}
}

// Create a new InternalAggregations with sum values
// Create a new InternalAggregations with recomputed values discarding the overlap
val allAggregations = mutableListOf<InternalAggregation>()
val addedAggregations = mutableSetOf<String>() // avoid repeating the same aggregations
for ((aggName, data) in aggValues) {
Expand Down

0 comments on commit acf1c4b

Please sign in to comment.