Skip to content

Commit

Permalink
Rollup Search API : Searching both historical rollup and non-rollup d…
Browse files Browse the repository at this point in the history
…ata (#901)

* added response interceptor

Signed-off-by: Ronnak Saxena <[email protected]>

* Base case: Query Live and Rollup data with no overlap

Signed-off-by: Ronnak Saxena <[email protected]>

* finished base case and added integ test

Signed-off-by: Ronnak Saxena <[email protected]>

* added to response interceptor

Signed-off-by: Ronnak Saxena <[email protected]>

* can rewrite request to bucket pipeline

Signed-off-by: Ronnak Saxena <[email protected]>

* trying to rewrite aggregations in a helper function

Signed-off-by: Ronnak Saxena <[email protected]>

* able to create new aggreations but getting shardIndex is not set error

Signed-off-by: Ronnak Saxena <[email protected]>

* Can find start and end times for rollup and live index

Signed-off-by: Ronnak Saxena <[email protected]>

* Handles overlap between 1 live index and 1 rollup index for sum aggregation

Signed-off-by: Ronnak Saxena <[email protected]>

* added min max aggregations and fixed intersection time calculation

Signed-off-by: Ronnak Saxena <[email protected]>

* changed variable name in computeAggregationsWithoutOverlap

Signed-off-by: Ronnak Saxena <[email protected]>

* Added integ tests for nonoverlapping case

Signed-off-by: Ronnak Saxena <[email protected]>

* added avg and value count aggregation

Signed-off-by: Ronnak Saxena <[email protected]>

* fixed ktlint and integ test

Signed-off-by: Ronnak Saxena <[email protected]>

* changed test and build workflow

Signed-off-by: Ronnak Saxena <[email protected]>

* added integ test for multiple live indices

Signed-off-by: Ronnak Saxena <[email protected]>

* added test case for alias live indices

Signed-off-by: Ronnak Saxena <[email protected]>

* cleaned up code and moved functions to utils file

Signed-off-by: Ronnak Saxena <[email protected]>

* fixed detekt errors

Signed-off-by: Ronnak Saxena <[email protected]>

* fixed ktlint error :/'

Signed-off-by: Ronnak Saxena <[email protected]>

* Can run all integ tests at once now

Signed-off-by: Ronnak Saxena <[email protected]>

* removed DateTimeFromatter

Signed-off-by: Ronnak Saxena <[email protected]>

* fixed inf interceptor loop, need to pass RollupInterceptorIT.test rollup search multiple target indices failed test

Signed-off-by: Ronnak Saxena <[email protected]>

* passes all integ tests

Signed-off-by: Ronnak Saxena <[email protected]>

* fixed detekt errors

Signed-off-by: Ronnak Saxena <[email protected]>

* deleted rest test

Signed-off-by: Ronnak Saxena <[email protected]>

* added test back

Signed-off-by: Ronnak Saxena <[email protected]>

* trying a new workflow build

Signed-off-by: Ronnak Saxena <[email protected]>

* added stars to worklow files

Signed-off-by: Ronnak Saxena <[email protected]>

* added unit test

Signed-off-by: Ronnak Saxena <[email protected]>

* resolved some PR comments from Bowen

Signed-off-by: Ronnak Saxena <[email protected]>

* resolved more comments on my PR

Signed-off-by: Ronnak Saxena <[email protected]>

* removed stars from workflows

Signed-off-by: Ronnak Saxena <[email protected]>

* testing time of alias test case

Signed-off-by: Ronnak Saxena <[email protected]>

* boring, took too long

Signed-off-by: Ronnak Saxena <[email protected]>

* commented out last 2 tests

Signed-off-by: Ronnak Saxena <[email protected]>

* removed all response interceptor tests

Signed-off-by: Ronnak Saxena <[email protected]>

* added one test back

Signed-off-by: Ronnak Saxena <[email protected]>

* fixed data stream integ tests

Signed-off-by: Ronnak Saxena <[email protected]>

* commented out breaking tests

Signed-off-by: Ronnak Saxena <[email protected]>

* added tests back

Signed-off-by: Ronnak Saxena <[email protected]>

* removed countdown latch and added coroutines

Signed-off-by: Ronnak Saxena <[email protected]>

* resolved comments on the PR

Signed-off-by: Ronnak Saxena <[email protected]>

* resolved PR comments added kotlin docs for methods

Signed-off-by: Ronnak Saxena <[email protected]>

---------

Signed-off-by: Ronnak Saxena <[email protected]>
  • Loading branch information
ronnaksaxena authored Sep 30, 2023
1 parent 1923003 commit 577d333
Show file tree
Hide file tree
Showing 10 changed files with 1,556 additions and 55 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/multi-node-test-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: Multi node test workflow
on:
pull_request:
branches:
- "*"
- "**"
push:
branches:
- "*"
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test-and-build-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: Test and Build Workflow
on:
pull_request:
branches:
- "*"
- "**"
push:
branches:
- "*"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ import org.opensearch.indexmanagement.rollup.action.start.TransportStartRollupAc
import org.opensearch.indexmanagement.rollup.action.stop.StopRollupAction
import org.opensearch.indexmanagement.rollup.action.stop.TransportStopRollupAction
import org.opensearch.indexmanagement.rollup.actionfilter.FieldCapsFilter
import org.opensearch.indexmanagement.rollup.interceptor.ResponseInterceptor
import org.opensearch.indexmanagement.rollup.interceptor.RollupInterceptor
import org.opensearch.indexmanagement.rollup.model.Rollup
import org.opensearch.indexmanagement.rollup.model.RollupMetadata
Expand Down Expand Up @@ -208,6 +209,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
lateinit var clusterService: ClusterService
lateinit var indexNameExpressionResolver: IndexNameExpressionResolver
lateinit var rollupInterceptor: RollupInterceptor
lateinit var responseInterceptor: ResponseInterceptor
lateinit var fieldCapsFilter: FieldCapsFilter
lateinit var indexMetadataProvider: IndexMetadataProvider
private val indexMetadataServices: MutableList<Map<String, IndexMetadataService>> = mutableListOf()
Expand Down Expand Up @@ -391,6 +393,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
environment
)
rollupInterceptor = RollupInterceptor(clusterService, settings, indexNameExpressionResolver)
responseInterceptor = ResponseInterceptor(clusterService, settings, indexNameExpressionResolver, client)
val jvmService = JvmService(environment.settings())
val transformRunner = TransformRunner.initialize(
client,
Expand Down Expand Up @@ -612,7 +615,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
}

override fun getTransportInterceptors(namedWriteableRegistry: NamedWriteableRegistry, threadContext: ThreadContext): List<TransportInterceptor> {
return listOf(rollupInterceptor)
return listOf(rollupInterceptor, responseInterceptor)
}

override fun getActionFilters(): List<ActionFilter> {
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ import org.opensearch.search.aggregations.metrics.ScriptedMetricAggregationBuild
import org.opensearch.search.aggregations.metrics.SumAggregationBuilder
import org.opensearch.search.aggregations.metrics.ValueCountAggregationBuilder
import org.opensearch.search.builder.SearchSourceBuilder
import java.time.LocalDateTime
import java.time.ZoneOffset
import java.time.ZonedDateTime

const val DATE_FIELD_STRICT_DATE_OPTIONAL_TIME_FORMAT = "strict_date_optional_time"
const val DATE_FIELD_EPOCH_MILLIS_FORMAT = "epoch_millis"
Expand Down Expand Up @@ -464,3 +467,83 @@ fun parseRollup(response: GetResponse, xContentRegistry: NamedXContentRegistry =

return xcp.parseWithType(response.id, response.seqNo, response.primaryTerm, Rollup.Companion::parse)
}
// Returns a SearchSourceBuilder with different aggregations but the rest of the properties are the same
@Suppress("ComplexMethod")
fun SearchSourceBuilder.changeAggregations(aggregationBuilderCollection: Collection<AggregationBuilder>): SearchSourceBuilder {
val ssb = SearchSourceBuilder()
aggregationBuilderCollection.forEach { ssb.aggregation(it) }
if (this.explain() != null) ssb.explain(this.explain())
if (this.ext() != null) ssb.ext(this.ext())
ssb.fetchSource(this.fetchSource())
this.docValueFields()?.forEach { ssb.docValueField(it.field, it.format) }
ssb.storedFields(this.storedFields())
if (this.from() >= 0) ssb.from(this.from())
ssb.highlighter(this.highlighter())
this.indexBoosts()?.forEach { ssb.indexBoost(it.index, it.boost) }
if (this.minScore() != null) ssb.minScore(this.minScore())
if (this.postFilter() != null) ssb.postFilter(this.postFilter())
ssb.profile(this.profile())
if (this.query() != null) ssb.query(this.query())
this.rescores()?.forEach { ssb.addRescorer(it) }
this.scriptFields()?.forEach { ssb.scriptField(it.fieldName(), it.script(), it.ignoreFailure()) }
if (this.searchAfter() != null) ssb.searchAfter(this.searchAfter())
if (this.slice() != null) ssb.slice(this.slice())
if (this.size() >= 0) ssb.size(this.size())
this.sorts()?.forEach { ssb.sort(it) }
if (this.stats() != null) ssb.stats(this.stats())
if (this.suggest() != null) ssb.suggest(this.suggest())
if (this.terminateAfter() >= 0) ssb.terminateAfter(this.terminateAfter())
if (this.timeout() != null) ssb.timeout(this.timeout())
ssb.trackScores(this.trackScores())
this.trackTotalHitsUpTo()?.let { ssb.trackTotalHitsUpTo(it) }
if (this.version() != null) ssb.version(this.version())
if (this.seqNoAndPrimaryTerm() != null) ssb.seqNoAndPrimaryTerm(this.seqNoAndPrimaryTerm())
if (this.collapse() != null) ssb.collapse(this.collapse())
return ssb
}
@Suppress("MagicNumber")
fun convertDateStringToEpochMillis(dateString: String): Long {
val parts = dateString.split(" ")
require(parts.size == 2) { "Date in was not correct format" }
val dateParts = parts[0].split("-")
val timeParts = parts[1].split(":")

require((dateParts.size == 3 && timeParts.size == 3)) { "Date in was not correct format" }
val year = dateParts[0].toInt()
val month = dateParts[1].toInt()
val day = dateParts[2].toInt()

val hour = timeParts[0].toInt()
val minute = timeParts[1].toInt()
val second = timeParts[2].toInt()

val localDateTime = LocalDateTime.of(year, month, day, hour, minute, second)
val instant = localDateTime.toInstant(ZoneOffset.UTC)
return instant.toEpochMilli()
}
@Suppress("MagicNumber")
fun convertFixedIntervalStringToMs(fixedInterval: String): Long {
// Possible types are ms, s, m, h, d
val regex = """(\d+)([a-zA-Z]+)""".toRegex()
val matchResult = regex.find(fixedInterval)
?: throw IllegalArgumentException("Invalid interval format: $fixedInterval")

val numericValue = matchResult.groupValues[1].toLong()
val intervalType = matchResult.groupValues[2]

val milliseconds = when (intervalType) {
"ms" -> numericValue
"s" -> numericValue * 1000L
"m" -> numericValue * 60 * 1000L
"h" -> numericValue * 60 * 60 * 1000L
"d" -> numericValue * 24 * 60 * 60 * 1000L
"w" -> numericValue * 7 * 24 * 60 * 60 * 1000L
else -> throw IllegalArgumentException("Unsupported interval type: $intervalType")
}

return milliseconds
}

fun zonedDateTimeToMillis(zonedDateTime: ZonedDateTime): Long {
return zonedDateTime.toInstant().toEpochMilli()
}
Loading

0 comments on commit 577d333

Please sign in to comment.