Skip to content

Commit

Permalink
added more debug logs
Browse files Browse the repository at this point in the history
Signed-off-by: Ronnak Saxena <[email protected]>
  • Loading branch information
ronnaksaxena committed Sep 12, 2023
1 parent f9b1f9c commit aaeaaec
Showing 1 changed file with 7 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class ResponseInterceptor(

// Calculated the end time for the current shard index if it is a rollup index with data overlapp
@Suppress("SpreadOperator")
fun getRollupEndTime(liveDataStartPoint: Long, rollupIndices: Array<String>, dateTargetField: String): Long {
fun getRollupEndTime(liveDataStartPoint: Long, rollupIndices: Array<String>, dateTargetField: String, shardRequestIndex: String): Long {
// Build search request to find the maximum rollup timestamp <= liveDataStartPoint
val sort = SortBuilders.fieldSort("$dateTargetField.date_histogram").order(SortOrder.DESC)
val query = QueryBuilders.boolQuery()
Expand All @@ -144,6 +144,7 @@ class ResponseInterceptor(
override fun onResponse(searchResponse: SearchResponse) {
res = searchResponse
latch.countDown()
logger.info("ronsax find rollup endtime for index $shardRequestIndex latch is ${latch.count}")
}

override fun onFailure(e: Exception) {
Expand Down Expand Up @@ -194,13 +195,14 @@ class ResponseInterceptor(
.indices(*rollupIndices) // add all rollup indices to this request
var maxRolledDateResponse: SearchResponse? = null
var latch = CountDownLatch(1)
logger.info("ronsax sending request to find max rollup time for index: $shardRequestIndex")
logger.info("ronsax sending request to find max rollup time for index: $shardRequestIndex latch is ${latch.count}")
client.search(
maxRolledDateRequest,
object : ActionListener<SearchResponse> {
override fun onResponse(searchResponse: SearchResponse) {
maxRolledDateResponse = searchResponse
latch.countDown()
logger.info("ronsax found max rollup time for index: $shardRequestIndex latch is ${latch.count}")
}

override fun onFailure(e: Exception) {
Expand Down Expand Up @@ -230,13 +232,14 @@ class ResponseInterceptor(

var minLiveDateResponse: SearchResponse? = null
latch = CountDownLatch(1)
logger.info("ronsax sending request to find minLiveData for index: $shardRequestIndex")
logger.info("ronsax sending request to find minLiveData for index: $shardRequestIndex latch is ${latch.count}")
client.search(
minLiveDateRequest,
object : ActionListener<SearchResponse> {
override fun onResponse(searchResponse: SearchResponse) {
minLiveDateResponse = searchResponse
latch.countDown()
logger.info("ronsax found minLiveData for index: $shardRequestIndex latch is ${latch.count}")
}

override fun onFailure(e: Exception) {
Expand All @@ -258,7 +261,7 @@ class ResponseInterceptor(
if ((liveDataStartPoint < rollupDataEndPoint) && isShardIndexRollup) {
// Start at 0, end at live data
logger.info("ronsax sending request to find rollup endtime for index: $shardRequestIndex")
val endTime = getRollupEndTime(liveDataStartPoint, rollupIndices, dateTargetField)
val endTime = getRollupEndTime(liveDataStartPoint, rollupIndices, dateTargetField, shardRequestIndex)
return Pair(0L, endTime)
}
}
Expand Down

0 comments on commit aaeaaec

Please sign in to comment.