Skip to content

Commit

Permalink
add tests to verify seq_no calculation
Browse files Browse the repository at this point in the history
Signed-off-by: Surya Sashank Nistala <[email protected]>
  • Loading branch information
eirsep committed Mar 1, 2024
1 parent d10cd9d commit d03b33f
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import org.opensearch.alerting.resthandler.RestSearchMonitorAction
import org.opensearch.alerting.script.TriggerScript
import org.opensearch.alerting.service.DeleteMonitorService
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.settings.AlertingSettings.Companion.DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE
import org.opensearch.alerting.settings.DestinationSettings
import org.opensearch.alerting.settings.LegacyOpenDistroAlertingSettings
import org.opensearch.alerting.settings.LegacyOpenDistroDestinationSettings
Expand Down Expand Up @@ -323,6 +324,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
AlertingSettings.ALERT_HISTORY_RETENTION_PERIOD,
AlertingSettings.ALERTING_MAX_MONITORS,
AlertingSettings.PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT,
DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE,
AlertingSettings.PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY,
AlertingSettings.REQUEST_TIMEOUT,
AlertingSettings.MAX_ACTION_THROTTLE_VALUE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -701,16 +701,16 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
fieldsToBeQueried,
)
if (hits.hits.isEmpty()) {
updateLastRunContext(shard, (prevSeqNo ?: SequenceNumbers.NO_OPS_PERFORMED).toString())
if (to == Long.MAX_VALUE) {
updateLastRunContext(shard, (prevSeqNo ?: SequenceNumbers.NO_OPS_PERFORMED).toString()) // didn't find any docs
}
break
}
if (to == Long.MAX_VALUE) { // max sequence number of shard needs to be computed

updateLastRunContext(shard, hits.hits[0].seqNo.toString())
to = hits.hits[0].seqNo - 10000L
} else {
to -= 10000L
}
val leastSeqNoFromHits = hits.hits.last().seqNo
to = leastSeqNoFromHits - 1
val startTime = System.currentTimeMillis()
transformedDocs.addAll(
transformSearchHitsAndReconstructDocs(
Expand Down Expand Up @@ -841,7 +841,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
.sort("_seq_no", SortOrder.DESC)
.seqNoAndPrimaryTerm(true)
.query(boolQueryBuilder)
.size(10000)
.size(monitorCtx.docLevelMonitorShardFetchSize)
)
.preference(Preference.PRIMARY_FIRST.type())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,6 @@ data class MonitorRunnerExecutionContext(
@Volatile var percQueryMaxNumDocsInMemory: Int = AlertingSettings.DEFAULT_PERCOLATE_QUERY_NUM_DOCS_IN_MEMORY,
@Volatile var percQueryDocsSizeMemoryPercentageLimit: Int =
AlertingSettings.DEFAULT_PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT,
@Volatile var docLevelMonitorShardFetchSize: Int =
AlertingSettings.DEFAULT_DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE,
)
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_COUNT
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_MILLIS
import org.opensearch.alerting.settings.AlertingSettings.Companion.DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED
import org.opensearch.alerting.settings.AlertingSettings.Companion.DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE
import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDINGS_INDEXING_BATCH_SIZE
import org.opensearch.alerting.settings.AlertingSettings.Companion.INDEX_TIMEOUT
import org.opensearch.alerting.settings.AlertingSettings.Companion.MAX_ACTIONABLE_ALERT_COUNT
Expand Down Expand Up @@ -202,6 +203,13 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
monitorCtx.percQueryDocsSizeMemoryPercentageLimit = it
}

monitorCtx.docLevelMonitorShardFetchSize =
DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE.get(monitorCtx.settings)
monitorCtx.clusterService!!.clusterSettings
.addSettingsUpdateConsumer(DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE) {
monitorCtx.docLevelMonitorShardFetchSize = it
}

return this
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class AlertingSettings {
const val DEFAULT_FINDINGS_INDEXING_BATCH_SIZE = 1000
const val DEFAULT_PERCOLATE_QUERY_NUM_DOCS_IN_MEMORY = 50000
const val DEFAULT_PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT = 10
const val DEFAULT_DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE = 10000

val ALERTING_MAX_MONITORS = Setting.intSetting(
"plugins.alerting.monitor.max_monitors",
Expand All @@ -38,6 +39,16 @@ class AlertingSettings {
Setting.Property.NodeScope, Setting.Property.Dynamic
)

/** Purely a setting used to verify seq_no calculation
*/
val DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE = Setting.intSetting(
"plugins.alerting.monitor.doc_level_monitor_shard_fetch_size",
DEFAULT_DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE,
1,
10000,
Setting.Property.NodeScope, Setting.Property.Dynamic
)

/** Defines the threshold of the maximum number of docs accumulated in memory to query against percolate query index in document
* level monitor execution. The docs are being collected from searching on shards of indices mentioned in the
* monitor input indices field. When the number of in-memory docs reaches or exceeds threshold we immediately perform percolate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,49 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
assertEquals("Alert saved for test monitor", 0, alerts.size)
}

fun `test seq_no calculation correctness when docs are deleted`() {
adminClient().updateSettings(AlertingSettings.DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE.key, 2)
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

val index = createTestIndex()

val docQuery =
DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf())
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))

val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id)
val monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action)))
)

indexDoc(index, "1", testDoc)
indexDoc(index, "2", testDoc)
indexDoc(index, "3", testDoc)
indexDoc(index, "4", testDoc)
indexDoc(index, "5", testDoc)
indexDoc(index, "11", testDoc)
indexDoc(index, "21", testDoc)
indexDoc(index, "31", testDoc)
indexDoc(index, "41", testDoc)
indexDoc(index, "51", testDoc)

deleteDoc(index, "51")
val response = executeMonitor(monitor, params = mapOf("dryrun" to "false"))

val output = entityAsMap(response)
assertEquals(monitor.name, output["monitor_name"])

for (triggerResult in output.objectMap("trigger_results").values) {
assertEquals(9, triggerResult.objectMap("action_results").values.size)
}
}

fun `test dryrun execute monitor with queryFieldNames set up with wrong field`() {

val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
Expand All @@ -163,6 +206,10 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
)

indexDoc(index, "1", testDoc)
indexDoc(index, "2", testDoc)
indexDoc(index, "3", testDoc)
indexDoc(index, "4", testDoc)
indexDoc(index, "5", testDoc)

val response = executeMonitor(monitor, params = DRYRUN_MONITOR)

Expand Down

0 comments on commit d03b33f

Please sign in to comment.