diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/ResponseInterceptor.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/ResponseInterceptor.kt index 389bc6628..355fdc1c3 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/ResponseInterceptor.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/ResponseInterceptor.kt @@ -126,8 +126,12 @@ class ResponseInterceptor( fun getRollupEndTime(liveDataStartPoint: Long, rollupIndices: Array, dateTargetField: String): Long { // Build search request to find the maximum rollup timestamp <= liveDataStartPoint val sort = SortBuilders.fieldSort("$dateTargetField.date_histogram").order(SortOrder.DESC) + // Add the 3 match alls to detect this was an internal client call in the request interceptor val query = QueryBuilders.boolQuery() .must(QueryBuilders.rangeQuery(dateTargetField).lte(liveDataStartPoint)) + .must(QueryBuilders.matchAllQuery()) + .must(QueryBuilders.matchAllQuery()) + .must(QueryBuilders.matchAllQuery()) val searchSourceBuilder = SearchSourceBuilder() .sort(sort) .query(query) @@ -179,6 +183,11 @@ class ResponseInterceptor( } val request: ShardSearchRequest = response.shardSearchRequest!! val oldQuery = request.source().query() + val newQuery = QueryBuilders.boolQuery() + .must(oldQuery) + .must(QueryBuilders.matchAllQuery()) + .must(QueryBuilders.matchAllQuery()) + .must(QueryBuilders.matchAllQuery()) val (rollupIndices, liveIndices) = getRollupAndLiveIndices(request) val shardRequestIndex = request.shardId().indexName val isShardIndexRollup = isRollupIndex(shardRequestIndex, clusterService.state()) @@ -186,7 +195,7 @@ class ResponseInterceptor( var sort = SortBuilders.fieldSort("$dateTargetField.date_histogram").order(SortOrder.DESC) var searchSourceBuilder = SearchSourceBuilder() .sort(sort) - .query(oldQuery) + .query(newQuery) .size(1) // Need to avoid infinite interceptor loop val maxRolledDateRequest = SearchRequest() diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt index e7438a4b1..4880898fc 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt @@ -177,7 +177,34 @@ class RollupInterceptor( request.source(request.source().changeAggregations(listOf(intervalAgg))) return } + // Detects if this was an internal client call to allow a size of 0 on a rollup search + fun requestCalledInInterceptor(request: ShardSearchRequest): Boolean { + val jsonRequest: String = request.source().query().toString() + // Detected dummy field from internal search request + if (jsonRequest.contains( + ",\n" + + " {\n" + + " \"match_all\" : {\n" + + " \"boost\" : 1.0\n" + + " }\n" + + " },\n" + + " {\n" + + " \"match_all\" : {\n" + + " \"boost\" : 1.0\n" + + " }\n" + + " },\n" + + " {\n" + + " \"match_all\" : {\n" + + " \"boost\" : 1.0\n" + + " }\n" + + " }" + ) + ) { + return true + } + return false + } @Suppress("SpreadOperator") override fun interceptHandler( action: String, @@ -202,11 +229,13 @@ class RollupInterceptor( } // Rewrite the request to fit rollup format if not already done previously if (isRollupIndex && !isReqeustRollupFormat(request)) { - // TODO fix logic to allow response interceptor client calls to have a size of 1 -// if (!requestCalledInInterceptor && request.source().size() != 0) { -// throw IllegalArgumentException("Rollup search must have size explicitly set to 0, " + -// "but found ${request.source().size()}") -// } +// TODO fix logic to allow response interceptor client calls to have a size of 1 + if (!requestCalledInInterceptor(request) && request.source().size() != 0) { + throw IllegalArgumentException( + "Rollup search must have size explicitly set to 0, " + + "but found ${request.source().size()}" + ) + } rewriteRollupRequest(request, rollupJob!!, concreteRollupIndicesArray) } }