Skip to content

Commit

Permalink
fix tests to verify aggregation query on alias optimziation with indi…
Browse files Browse the repository at this point in the history
…ces being skipped and not skipped

Signed-off-by: Surya Sashank Nistala <[email protected]>
  • Loading branch information
eirsep committed Oct 21, 2024
1 parent 86cae1c commit cc4bfa4
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -305,15 +305,18 @@ class InputService(
resolvedIndexes.add(indexMetadata.index.name)
includePrevious = false // No need to include previous anymore
} else if (
includePrevious && i > 0 && sortedIndices[i - 1].creationDate <
resolveStartTimeOfQueryTimeRange.toEpochMilli()
includePrevious && (
i == sortedIndices.lastIndex ||
sortedIndices[i + 1].creationDate >= resolveStartTimeOfQueryTimeRange.toEpochMilli()
)
) {
// Include the index immediately before the timestamp
resolvedIndexes.add(indexMetadata.index.name)
includePrevious = false
}
}
} else {
// add alias without optimizing for resolve indices
resolvedIndexes.add(it)
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1261,6 +1261,25 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
}
}

protected fun insertSampleTimeSerializedDataWithTime(
index: String,
data: List<String>,
time: ZonedDateTime? = ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS),
) {
data.forEachIndexed { i, value ->
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(time)
val testDoc = """
{
"test_strict_date_time": "$testTime",
"test_field": "$value",
"number": "$i"
}
""".trimIndent()
// Indexing documents with deterministic doc id to allow for easy selected deletion during testing
indexDoc(index, (i + 1).toString(), testDoc)
}
}

protected fun deleteDataWithDocIds(index: String, docIds: List<String>) {
docIds.forEach {
deleteDoc(index, it)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1186,16 +1186,10 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() {
assertEquals("Incorrect search result", 2, buckets.size)
}

fun `test execute bucket-level monitor with alias optimization - indices not skipped`() {
val testIndex = createTestIndex()
insertSampleTimeSerializedDataCurrentTime(
testIndex,
listOf(
"test_value_3",
"test_value_4", // adding duplicate to verify aggregation
"test_value_5"
)
)
fun `test execute bucket-level monitor with alias optimization - indices not skipped from query`() {
val skipIndex = createTestIndex("to_skip_index")
val previousIndex = createTestIndex("to_include_index")

val indexMapping = """
"properties" : {
"test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" },
Expand All @@ -1213,7 +1207,24 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() {
"test_value_2"
)
)
addIndexToAlias(testIndex, aliasName)
insertSampleTimeSerializedDataWithTime(
previousIndex,
listOf(
"test_value_3",
"test_value_4",
"test_value_5"
)
)
insertSampleTimeSerializedDataWithTime(
skipIndex,
listOf(
"test_value_6",
"test_value_7",
"test_value_8"
)
)
addIndexToAlias(previousIndex, aliasName)
addIndexToAlias(skipIndex, aliasName)
val query = QueryBuilders.rangeQuery("test_strict_date_time")
.gt("{{period_end}}||-10s")
.lte("{{period_end}}")
Expand Down Expand Up @@ -1246,18 +1257,30 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() {
val searchResult = (output.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
@Suppress("UNCHECKED_CAST")
val buckets = searchResult.stringMap("aggregations")?.stringMap("composite_agg")?.get("buckets") as List<Map<String, Any>>
assertEquals("Incorrect search result", 5, buckets.size)
Assert.assertEquals(buckets.size, 8)
}

fun `test execute bucket-level monitor with alias optimization - indices skipped from query`() {
val testIndex = createTestIndex()
insertSampleTimeSerializedDataCurrentTime(
testIndex,
val skipIndex = createTestIndex("to_skip_index")
Thread.sleep(10000)
val previousIndex = createTestIndex("to_include_index")
insertSampleTimeSerializedDataWithTime(
previousIndex,
listOf(
"test_value_1",
"test_value_1", // adding duplicate to verify aggregation
"test_value_2"
)
"test_value_3",
"test_value_4",
"test_value_5"
),
ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS).plusSeconds(10)
)
insertSampleTimeSerializedDataWithTime(
skipIndex,
listOf(
"test_value_6",
"test_value_7",
"test_value_8"
),
ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS).plusSeconds(10)
)
Thread.sleep(10000)
val indexMapping = """
Expand All @@ -1277,7 +1300,8 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() {
"test_value_2"
)
)
addIndexToAlias(testIndex, aliasName)
addIndexToAlias(previousIndex, aliasName)
addIndexToAlias(skipIndex, aliasName)
val query = QueryBuilders.rangeQuery("test_strict_date_time")
.gt("{{period_end}}||-10s")
.lte("{{period_end}}")
Expand Down Expand Up @@ -1310,7 +1334,7 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() {
val searchResult = (output.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
@Suppress("UNCHECKED_CAST")
val buckets = searchResult.stringMap("aggregations")?.stringMap("composite_agg")?.get("buckets") as List<Map<String, Any>>
Assert.assertTrue(buckets.size <= 2)
Assert.assertTrue(buckets.size <= 5)
}

fun `test execute bucket-level monitor returns search result with multi term agg`() {
Expand Down

0 comments on commit cc4bfa4

Please sign in to comment.