-
Notifications
You must be signed in to change notification settings - Fork 113
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Rollup Search API : Searching both historical rollup and non-rollup data #901
Rollup Search API : Searching both historical rollup and non-rollup data #901
Conversation
src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt
Outdated
Show resolved
Hide resolved
return false | ||
} | ||
// Helper fn to avoid rewritting a rollup request an extra time | ||
fun isReqeustRollupFormat(request: ShardSearchRequest): Boolean { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isRequest
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function check if the request was rewritten to match the rollup index data. I do not think isRequest is descriptive enough for its purpose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
intent to say there's a typo here...
src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt
Outdated
Show resolved
Hide resolved
src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt
Outdated
Show resolved
Hide resolved
src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/ResponseInterceptor.kt
Outdated
Show resolved
Hide resolved
src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/ResponseInterceptor.kt
Outdated
Show resolved
Hide resolved
src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtils.kt
Outdated
Show resolved
Hide resolved
// Avoid infinite interceptor loop | ||
if (concreteRollupIndicesArray.isNotEmpty() && concreteLiveIndicesArray.isNotEmpty() && request.source().aggregations() != null && !isRequestRewrittenIntoBuckets(request)) { | ||
// Break apart request to remove overlapping parts | ||
breakRequestIntoBuckets(request, rollupJob!!) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Misleading name, you are adding one more bucket aggregation to the request
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bowenlan-amzn I add another aggregation but wrap the existing aggregations into a sub aggregation. Would a more appropriate name be putAggregationsIntoBuckets() ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe just breakIntoBuckets
// Handle the response if it came from interceptor | ||
when (response) { | ||
// live index | ||
is QuerySearchResult -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure why live index is QuerySearchResult, but for rollup index is QueryFetchSearchResult, why the difference here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed this. I kept it for testing by sending a request with the aggregation name as "interceptor_interval_data" I can check if the behavior in the response interceptor is correct. (The response would be a QueryFetchSearchReuslt) on one rollup index)
src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/ResponseInterceptor.kt
Show resolved
Hide resolved
src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/ResponseInterceptor.kt
Outdated
Show resolved
Hide resolved
client.search( | ||
maxRolledDateRequest, | ||
object : ActionListener<SearchResponse> { | ||
override fun onResponse(searchResponse: SearchResponse) { | ||
maxRolledDateResponse = searchResponse | ||
latch.countDown() | ||
} | ||
|
||
override fun onFailure(e: Exception) { | ||
logger.error("ronsax maxLiveDate request failed ", e) | ||
latch.countDown() | ||
} | ||
} | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't want this client call to be executed for each shard response
So investigate if you can save the result of this call to a variable in this class ResponseInterceptor, and reuse for other shards response
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will look into this if I am time. I think a core modification will be needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets add unit tests at the class level to be able to verify helper/util and class methods
Signed-off-by: Ronnak Saxena <[email protected]>
Signed-off-by: Ronnak Saxena <[email protected]>
Signed-off-by: Ronnak Saxena <[email protected]>
Signed-off-by: Ronnak Saxena <[email protected]>
Signed-off-by: Ronnak Saxena <[email protected]>
Signed-off-by: Ronnak Saxena <[email protected]>
Signed-off-by: Ronnak Saxena <[email protected]>
Signed-off-by: Ronnak Saxena <[email protected]>
…gation Signed-off-by: Ronnak Saxena <[email protected]>
Signed-off-by: Ronnak Saxena <[email protected]>
Signed-off-by: Ronnak Saxena <[email protected]>
Signed-off-by: Ronnak Saxena <[email protected]>
Signed-off-by: Ronnak Saxena <[email protected]>
Signed-off-by: Ronnak Saxena <[email protected]>
Signed-off-by: Ronnak Saxena <[email protected]>
Signed-off-by: Ronnak Saxena <[email protected]>
Signed-off-by: Ronnak Saxena <[email protected]>
Signed-off-by: Ronnak Saxena <[email protected]>
Now we can see this change broken data stream scenario https://github.com/opensearch-project/index-management/actions/runs/6125162603/job/16626572043?pr=901
|
Signed-off-by: Ronnak Saxena <[email protected]>
Signed-off-by: Ronnak Saxena <[email protected]>
Signed-off-by: Ronnak Saxena <[email protected]>
src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/ResponseInterceptor.kt
Outdated
Show resolved
Hide resolved
src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/ResponseInterceptor.kt
Outdated
Show resolved
Hide resolved
src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/ResponseInterceptor.kt
Outdated
Show resolved
Hide resolved
Signed-off-by: Ronnak Saxena <[email protected]>
Signed-off-by: Ronnak Saxena <[email protected]>
Codecov Report
@@ Coverage Diff @@
## feature/rollup-and-live-search-2.9 #901 +/- ##
========================================================================
+ Coverage 75.88% 76.19% +0.31%
- Complexity 2899 2933 +34
========================================================================
Files 366 367 +1
Lines 16624 16946 +322
Branches 2411 2503 +92
========================================================================
+ Hits 12615 12912 +297
+ Misses 2623 2599 -24
- Partials 1386 1435 +49
|
Signed-off-by: Ronnak Saxena <[email protected]>
src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/ResponseInterceptor.kt
Show resolved
Hide resolved
src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/ResponseInterceptor.kt
Outdated
Show resolved
Hide resolved
src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/ResponseInterceptor.kt
Outdated
Show resolved
Hide resolved
src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/ResponseInterceptor.kt
Outdated
Show resolved
Hide resolved
src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/ResponseInterceptor.kt
Outdated
Show resolved
Hide resolved
Can you add TODOs on the client calls made from within the shard level interceptor as this would imply for one rollup search request, we are making n more search requests where n is number of shards and that would have performance implications. We can merge this PR and continue to work on it. |
Signed-off-by: Ronnak Saxena <[email protected]>
throw IllegalArgumentException("Rollup search must have size explicitly set to 0, but found ${request.source().size()}") | ||
val isDataStream = (request.indices().any { IndexUtils.isDataStream(it, clusterService.state()) }) | ||
val shardRequestIndex = request.shardId().indexName | ||
// isRollupIndex throws an exception if the request is on a data stream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the exception found out here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a null ptr exception when checking is the shardId indexName for a data stream is a rollup index
src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt
Outdated
Show resolved
Hide resolved
return false | ||
} | ||
// Helper fn to avoid rewritting a rollup request an extra time | ||
fun isReqeustRollupFormat(request: ShardSearchRequest): Boolean { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
intent to say there's a typo here...
val isRollupIndex = if (isDataStream) false else isRollupIndex(shardRequestIndex, clusterService.state()) | ||
val (containsRollup, rollupJob) = if (isDataStream) Pair(false, null) else originalSearchContainsRollup(request) | ||
// Only modifies rollup searches and avoids internal client calls | ||
if (containsRollup || isRollupIndex) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need || isRollupIndex
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes this check is needed. The base case fails unless I check for both but maybe the logic could be cleaned up to have less if checks
// Avoid infinite interceptor loop | ||
if (concreteRollupIndicesArray.isNotEmpty() && concreteLiveIndicesArray.isNotEmpty() && request.source().aggregations() != null && !isRequestRewrittenIntoBuckets(request)) { | ||
// Break apart request to remove overlapping parts | ||
breakRequestIntoBuckets(request, rollupJob!!) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe just breakIntoBuckets
// Rewrite the request to fit rollup format if not already done previously | ||
if (isRollupIndex && !isReqeustRollupFormat(request)) { | ||
/* Client calls from the response interceptor require request bodies of 1, | ||
otherwise do not allow size > 0 for rollup indices | ||
*/ | ||
if (!canHaveSize(request) && request.source().size() != 0) { | ||
throw IllegalArgumentException( | ||
"Rollup search must have size explicitly set to 0, " + | ||
"but found ${request.source().size()}" | ||
) | ||
} | ||
rewriteRollupRequest(request, rollupJob!!, concreteRollupIndicesArray) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be moved out from the parent if condition if (containsRollup
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No I cannot I need the concreteLiveIndices array to call the rewriteRollupRequest helper function
src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt
Show resolved
Hide resolved
src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/ResponseInterceptor.kt
Outdated
Show resolved
Hide resolved
@@ -3,7 +3,7 @@ name: Multi node test workflow | |||
on: | |||
pull_request: | |||
branches: | |||
- "*" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we remove this after PR is approved.
src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/ResponseInterceptor.kt
Outdated
Show resolved
Hide resolved
Signed-off-by: Ronnak Saxena <[email protected]>
@bowenlan-amzn @eirsep Thank you guys for the feedback. I pushed a new commit with suggested changes. 👍🏽 |
577d333
into
opensearch-project:feature/rollup-and-live-search-2.9
This feature supports searching both historical rollup and non-rollup data
To use the endpoint is
GET <rollup_index>,<live_index1>, <live_index2>, <live_indexX>/_search
Currently supports:
It works like this:
The type of use cases that could arise are:
Base Case: No data overlap between rollup indices and live data indices
GET /rollup-data,live-data/_search
Case 2: Overlapping Data Between 1 Rollup index and 1 Live index
Attempting to handle this use case from my design doc:
Flow for this code would look like:
Case 3: Overlapping data between multiple (aliased) live data indices
Can add a comma separated list of all index names or index alias/template
GET /rollup-data,live-data-alias/_search
CheckList:
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.