Skip to content

Commit

Permalink
removed countdown latch and added coroutines
Browse files Browse the repository at this point in the history
Signed-off-by: Ronnak Saxena <[email protected]>
  • Loading branch information
ronnaksaxena committed Sep 12, 2023
1 parent 35ee9e6 commit 06b7b02
Showing 1 changed file with 29 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@

package org.opensearch.indexmanagement.rollup.interceptor

import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.opensearch.action.ActionListener
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.client.Client
Expand All @@ -24,6 +28,7 @@ import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.search.internal.ShardSearchRequest
import org.opensearch.search.query.QuerySearchResult
import org.opensearch.index.query.QueryBuilders
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.rollup.util.convertDateStringToEpochMillis
import org.opensearch.indexmanagement.rollup.util.convertFixedIntervalStringToMs
import org.opensearch.indexmanagement.rollup.util.getRollupJobs
Expand All @@ -45,7 +50,6 @@ import org.opensearch.transport.TransportRequestOptions
import org.opensearch.transport.TransportResponse
import org.opensearch.transport.TransportResponseHandler
import java.time.ZonedDateTime
import java.util.concurrent.CountDownLatch
import kotlin.math.max
import kotlin.math.min

Expand All @@ -54,7 +58,8 @@ class ResponseInterceptor(
val settings: Settings,
val indexNameExpressionResolver: IndexNameExpressionResolver,
val client: Client
) : TransportInterceptor {
) : TransportInterceptor,
CoroutineScope by CoroutineScope(SupervisorJob() + Dispatchers.Default + CoroutineName("Rollup Response Interceptor")) {
private val logger = LogManager.getLogger(javaClass)
override fun interceptSender(sender: TransportInterceptor.AsyncSender): TransportInterceptor.AsyncSender {
return CustomAsyncSender(sender)
Expand Down Expand Up @@ -123,7 +128,7 @@ class ResponseInterceptor(

// Calculated the end time for the current shard index if it is a rollup index with data overlapp
@Suppress("SpreadOperator")
fun getRollupEndTime(liveDataStartPoint: Long, rollupIndices: Array<String>, dateTargetField: String): Long {
suspend fun getRollupEndTime(liveDataStartPoint: Long, rollupIndices: Array<String>, dateTargetField: String): Long {
// Build search request to find the maximum rollup timestamp <= liveDataStartPoint
val sort = SortBuilders.fieldSort("$dateTargetField.date_histogram").order(SortOrder.DESC)
val query = QueryBuilders.boolQuery()
Expand All @@ -136,23 +141,7 @@ class ResponseInterceptor(
val req = SearchRequest()
.source(searchSourceBuilder)
.indices(*rollupIndices)
var res: SearchResponse? = null
val latch = CountDownLatch(1)
client.search(
req,
object : ActionListener<SearchResponse> {
override fun onResponse(searchResponse: SearchResponse) {
res = searchResponse
latch.countDown()
}

override fun onFailure(e: Exception) {
logger.error("request to find intersection time failed :(", e)
latch.countDown()
}
}
)
latch.await()
val res = client.suspendUntil { search(req, it) }
try {
return res!!.hits.hits[0].sourceAsMap.get("$dateTargetField.date_histogram") as Long
} catch (e: Exception) {
Expand All @@ -164,7 +153,7 @@ class ResponseInterceptor(
// Returns Pair(startRange: Long, endRange: Long)
// Note startRange is inclusive and endRange is exclusive, they are Longs because the type is epoch milliseconds
@Suppress("LongMethod", "SpreadOperator")
fun findOverlap(response: QuerySearchResult): Pair<Long, Long> {
suspend fun findOverlap(response: QuerySearchResult): Pair<Long, Long> {
val job: Rollup = getRollupJob(response)!! // maybe throw a try catch later
var dateSourceField: String = ""
var dateTargetField: String = ""
Expand All @@ -189,27 +178,11 @@ class ResponseInterceptor(
.query(oldQuery)
.size(1)
// Need to avoid infinite interceptor loop
val maxRolledDateRequest = SearchRequest()
val maxRollupDateRequest = SearchRequest()
.source(searchSourceBuilder)
.indices(*rollupIndices) // add all rollup indices to this request
var maxRolledDateResponse: SearchResponse? = null
var latch = CountDownLatch(1)
logger.info("ronsax sending request to find max rollup time for index: $shardRequestIndex")
client.search(
maxRolledDateRequest,
object : ActionListener<SearchResponse> {
override fun onResponse(searchResponse: SearchResponse) {
maxRolledDateResponse = searchResponse
latch.countDown()
}

override fun onFailure(e: Exception) {
logger.error("maxLiveDate request failed in response interceptor", e)
latch.countDown()
}
}
)
latch.await()
logger.info("Sending maxRollupDate request for $shardRequestIndex")
val maxRollupDateResponse: SearchResponse? = client.suspendUntil { search(maxRollupDateRequest, it) }
// Build search request to find the minimum date in all live indices
sort = SortBuilders.fieldSort(dateSourceField).order(SortOrder.ASC)
searchSourceBuilder = SearchSourceBuilder()
Expand All @@ -227,32 +200,15 @@ class ResponseInterceptor(
} else { // shard index is live index
minLiveDateRequest.indices(shardRequestIndex)
}

var minLiveDateResponse: SearchResponse? = null
latch = CountDownLatch(1)
logger.info("ronsax sending request to find minLiveData for index: $shardRequestIndex")
client.search(
minLiveDateRequest,
object : ActionListener<SearchResponse> {
override fun onResponse(searchResponse: SearchResponse) {
minLiveDateResponse = searchResponse
latch.countDown()
}

override fun onFailure(e: Exception) {
logger.error("minLiveDate request failed in response interceptor", e)
latch.countDown()
}
}
)
latch.await()
val foundMinAndMax = (minLiveDateResponse != null && maxRolledDateResponse != null)
logger.info("Sending minLiveData request for $shardRequestIndex")
var minLiveDateResponse: SearchResponse? = client.suspendUntil { search(minLiveDateRequest, it) }
val foundMinAndMax = (minLiveDateResponse != null && maxRollupDateResponse != null)
// if they overlap find part to exclude
if (foundMinAndMax && minLiveDateResponse!!.hits.hits.isNotEmpty() && maxRolledDateResponse!!.hits.hits.isNotEmpty()) {
if (foundMinAndMax && minLiveDateResponse!!.hits.hits.isNotEmpty() && maxRollupDateResponse!!.hits.hits.isNotEmpty()) {
// Rollup data ends at maxRolledDate + fixedInterval
val maxRolledDate: Long = maxRolledDateResponse!!.hits.hits[0].sourceAsMap.get("$dateTargetField.date_histogram") as Long
val maxRolledDate: Long = maxRollupDateResponse.hits.hits[0].sourceAsMap.get("$dateTargetField.date_histogram") as Long
val rollupDataEndPoint = maxRolledDate + convertFixedIntervalStringToMs(fixedInterval = rollupInterval!!)
val minLiveDate = minLiveDateResponse!!.hits.hits[0].sourceAsMap.get("$dateSourceField") as String
val minLiveDate = minLiveDateResponse.hits.hits[0].sourceAsMap.get("$dateSourceField") as String
val liveDataStartPoint = convertDateStringToEpochMillis(minLiveDate)
// If intersection found on rollup index, remove overlap
if ((liveDataStartPoint < rollupDataEndPoint) && isShardIndexRollup) {
Expand Down Expand Up @@ -355,8 +311,8 @@ class ResponseInterceptor(
// Store the running values of the aggregations being computed
// {aggName: String: Pair<value: Any, type:String>}
val aggValues = mutableMapOf<String, Pair<Any, String>>()
//
// // Iterate through each aggregation and bucket

// Iterate through each aggregation and bucket
val interceptorAgg = intervalAggregations.asMap().get("interceptor_interval_data") as InternalDateHistogram
for (bucket in interceptorAgg.buckets) {
val zdt = bucket.key as ZonedDateTime
Expand Down Expand Up @@ -409,11 +365,13 @@ class ResponseInterceptor(
// live index
is QuerySearchResult -> {
if (response.hasAggs() && isRewrittenInterceptorRequest(response)) {
// Check for overlap
val (startTime, endTime) = findOverlap(response)
// Modify agg to be original result without overlap computed in
response.aggregations(computeAggregationsWithoutOverlap(response.aggregations().expand(), startTime, endTime))
originalHandler?.handleResponse(response)
launch {
// Check for overlap
val (startTime, endTime) = findOverlap(response)
// Modify agg to be original result without overlap computed in
response.aggregations(computeAggregationsWithoutOverlap(response.aggregations().expand(), startTime, endTime))
originalHandler?.handleResponse(response)
}
} else {
originalHandler?.handleResponse(response)
}
Expand Down

0 comments on commit 06b7b02

Please sign in to comment.