Skip to content

Commit

Permalink
resolved PR comments added kotlin docs for methods
Browse files Browse the repository at this point in the history
Signed-off-by: Ronnak Saxena <[email protected]>
  • Loading branch information
ronnaksaxena committed Sep 14, 2023
1 parent acf1c4b commit 9c9d242
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ import kotlin.math.min

/**
* The Response Interceptor class modifies resopnses if the search API is triggered on rollup and live data
* 1. The class check if the request was rewritten in the RollupInterceptor into more granular buckets
* 1. The class checks if the request was rewritten in the RollupInterceptor into more granular buckets
* 2. in findOverlap it checks for overlap between live data and rollup data and returns the interval to include
* 3. computeAggregationsWithoutOverlap() iterates through the buckets and recomputes the aggregations in the expected format
* 4. Returns a new response for each shard to be combined later
Expand All @@ -73,12 +73,12 @@ class ResponseInterceptor(
}

private inner class CustomAsyncSender(private val originalSender: TransportInterceptor.AsyncSender) : TransportInterceptor.AsyncSender {
override fun <T : TransportResponse?> sendRequest(
connection: Transport.Connection?,
action: String?,
request: TransportRequest?,
options: TransportRequestOptions?,
handler: TransportResponseHandler<T>?
override fun <T : TransportResponse> sendRequest(
connection: Transport.Connection,
action: String,
request: TransportRequest,
options: TransportRequestOptions,
handler: TransportResponseHandler<T>
) {
val interceptedHandler = CustomResponseHandler(handler)

Expand All @@ -94,6 +94,13 @@ class ResponseInterceptor(
val response = originalHandler?.read(inStream)
return response!!
}

/**
* Check if this response was modified in the request interceptor
* and should be put back together
* @param QuerySearchResult
* @return Boolean
*/
fun isRewrittenInterceptorRequest(response: QuerySearchResult): Boolean {
val currentAggregations = response.aggregations().expand()
for (agg in currentAggregations) {
Expand Down Expand Up @@ -135,6 +142,8 @@ class ResponseInterceptor(

/**
* Calculates the end time for the current shard index if it is a rollup index with data overlapp
* @params liveDataStartPoint: Long, rollupIndices: Array<String>, dateTargetField: String
* @return Long
**/
@Suppress("SpreadOperator")
suspend fun getRollupEndTime(liveDataStartPoint: Long, rollupIndices: Array<String>, dateTargetField: String): Long {
Expand All @@ -159,8 +168,10 @@ class ResponseInterceptor(
return 0L // dummy :P
}
/**
* Returns Pair(startRange: Long, endRange: Long)
* Checks for overlap in timeseries data and returns the non overlapping interval to include
* Note startRange is inclusive and endRange is exclusive, they are Longs because the type is epoch milliseconds
* @param QuerySearchResult
* @return Pair(startRange: Long, endRange: Long)
**/
// TODO intercept at the index level instead of the shard level to avoid redundant client calls for every index
@Suppress("LongMethod", "SpreadOperator")
Expand Down Expand Up @@ -234,7 +245,11 @@ class ResponseInterceptor(
return Pair(0L, Long.MAX_VALUE)
}

// Depending on which metric the aggregation is, computation is different
/**
* Depending on which metric the aggregation is, computation is different
* @params agg: org.opensearch.search.aggregations.Aggregation, currentValue: Any
* @return Pair<Any, String>
*/
@Suppress("ReturnCount")
fun computeRunningValue(agg: org.opensearch.search.aggregations.Aggregation, currentValue: Any): Pair<Any, String> {
when (agg) {
Expand Down Expand Up @@ -279,7 +294,11 @@ class ResponseInterceptor(
else -> throw IllegalArgumentException("Could not recreate an aggregation for type $aggType")
}
}
// Create original avg aggregation
/**
* Create original avg aggregation
* @return InternalAvg
*/

fun initRollupAvgAgg(
modifiedName: String,
value: Any,
Expand Down Expand Up @@ -319,6 +338,8 @@ class ResponseInterceptor(

/**
* Returns a new InternalAggregations that contains merged aggregation(s) with the overlapping data removed
* @params intervalAggregations: InternalAggregations, start: Long, end: Long
* @return InternalAggregations
*/
@Suppress("NestedBlockDepth")
fun computeAggregationsWithoutOverlap(intervalAggregations: InternalAggregations, start: Long, end: Long): InternalAggregations {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,12 @@ class RollupInterceptor(
searchAllJobs = it
}
}
// Returns Pair<containsRollup: Boolean, rollupJob: RollupJob>

/**
* Checks if any of the indices in the original request contain a rollup index
* @param ShardSearchRequest
* @return Pair<containsRollup: Boolean, rollupJob: RollupJob>
*/
@Suppress("SpreadOperator")
private fun originalSearchContainsRollup(request: ShardSearchRequest): Pair<Boolean, Rollup?> { // Throwing an error on data streams
val indices = request.indices().map { it.toString() }.toTypedArray()
Expand All @@ -89,7 +94,11 @@ class RollupInterceptor(
}
return Pair(false, null)
}
// Returns true if request was already modified into "interceptor_interval_data" bucket aggregation
/**
* Returns true if request was already modified into "interceptor_interval_data" bucket aggregation
* @param ShardSearchRequest
* @return Boolean
*/
fun isRequestRewrittenIntoBuckets(request: ShardSearchRequest): Boolean {
val currentAggs = request.source().aggregations().aggregatorFactories
if (currentAggs != null) {
Expand All @@ -101,8 +110,12 @@ class RollupInterceptor(
}
return false
}
// Helper fn to avoid rewritting a rollup request an extra time
fun isReqeustRollupFormat(request: ShardSearchRequest): Boolean {
/**
* Helper fn to avoid rewritting a rollup request an extra time
* @param ShardSearchRequest
* @return Boolean
*/
fun isRequestRollupFormat(request: ShardSearchRequest): Boolean {
if (request.source().query() != null) {
val jsonRequest: String = request.source().query().toString()
// Detected dummy field from internal search request
Expand All @@ -112,11 +125,28 @@ class RollupInterceptor(
}
return false
}
// If the request has a sort on it, size can be > 0 on a rollup search
fun canHaveSize(request: ShardSearchRequest): Boolean {

/**
* If the request has a sort on it, size can be > 0 on a rollup search
* Context: we need to make call to find out the min, max time of rollup and live indexes,
* however, this call would be intercepted here and get re-write.
*
* So the idea is to explicitly allow this call to go through rollup interceptor w/o re-write them,
* so that whether it's from response interceptor or client, both should work.
* @param ShardSearchRequest
* @return Boolean
*/
fun allowRequest(request: ShardSearchRequest): Boolean {
return request.source().sorts() != null
}
// Need to modify aggs for rollup docs with avg and value count aggs
/**
* Need to modify aggs for rollup docs with avg and value count aggs
* Context in order to recompute the avg metric in the response interceptor
* Need to modify the avg request into a sum and value count request,
* Then ResponseInterceptor puts the aggs back into an avg aggregation
* @param MutableCollection<AggregationBuilder>
* @return AggregatorFactories.Builder
*/
fun modifyRollupAggs(aggFacts: MutableCollection<AggregationBuilder>): AggregatorFactories.Builder {
val build = AggregatorFactories.builder()
for (agg in aggFacts) {
Expand Down Expand Up @@ -155,8 +185,12 @@ class RollupInterceptor(
return build
}

// Wrap original aggregations into buckets based on fixed interval to remove overlap in response interceptor
fun breakRequestIntoBuckets(request: ShardSearchRequest, rollupJob: Rollup) {
/**
* Wrap original aggregations into a bucket aggreagtion based on fixed interval
* to make response more granular and remove overlap in response interceptor
* @params request: ShardSearchRequest, rollupJob: Rollup
*/
fun breakIntoBuckets(request: ShardSearchRequest, rollupJob: Rollup) {
val oldAggs = modifyRollupAggs(request.source().aggregations().aggregatorFactories)
var dateSourceField: String = ""
var rollupInterval: String = ""
Expand Down Expand Up @@ -205,14 +239,14 @@ class RollupInterceptor(
val isMultiSearch = (concreteRollupIndicesArray.isNotEmpty() && concreteLiveIndicesArray.isNotEmpty())
if (isMultiSearch && request.source().aggregations() != null && !isRequestRewrittenIntoBuckets(request)) {
// Break apart request to remove overlapping parts
breakRequestIntoBuckets(request, rollupJob!!)
breakIntoBuckets(request, rollupJob!!)
}
// Rewrite the request to fit rollup format if not already done previously
if (isRollupIndex && !isReqeustRollupFormat(request)) {
if (isRollupIndex && !isRequestRollupFormat(request)) {
/* Client calls from the response interceptor require request bodies of 1,
otherwise do not allow size > 0 for rollup indices
*/
if (!canHaveSize(request) && request.source().size() != 0) {
if (!allowRequest(request) && request.source().size() != 0) {
throw IllegalArgumentException(
"Rollup search must have size explicitly set to 0, " +
"but found ${request.source().size()}"
Expand All @@ -226,7 +260,10 @@ class RollupInterceptor(
}
}
}
// Returns Pair (concreteRollupIndices: Array<String>, concreteLiveIndicesArray: Array<String>)

/**
* @return Pair (concreteRollupIndices: Array<String>, concreteLiveIndicesArray: Array<String>)
*/
@Suppress("SpreadOperator")
fun getConcreteIndices(request: ShardSearchRequest): Pair<Array<String>, Array<String>> {
val indices = request.indices().map { it.toString() }.toTypedArray()
Expand All @@ -245,6 +282,10 @@ class RollupInterceptor(
val concreteLiveIndicesArray = concreteLiveIndexNames.toTypedArray()
return Pair(concreteRollupIndicesArray, concreteLiveIndicesArray)
}

/**
* Modifies ShardSearchRequest to fit rollup index format
*/
fun rewriteRollupRequest(request: ShardSearchRequest, rollupJob: Rollup, concreteRollupIndicesArray: Array<String>) {
// To extract fields from QueryStringQueryBuilder we need concrete source index name.
val queryFieldMappings = getQueryMetadata(
Expand Down

0 comments on commit 9c9d242

Please sign in to comment.