Skip to content

Commit

Permalink
Add jvm aware setting and max num docs settings for batching docs for…
Browse files Browse the repository at this point in the history
… percolate queries (#1435)

* add jvm aware and max docs settings for batching docs for percolate queries

Signed-off-by: Surya Sashank Nistala <[email protected]>

* fix stats logging

Signed-off-by: Surya Sashank Nistala <[email protected]>

* add queryfieldnames field in findings mapping

Signed-off-by: Surya Sashank Nistala <[email protected]>

---------

Signed-off-by: Surya Sashank Nistala <[email protected]>
  • Loading branch information
eirsep authored and jowg-amazon committed Mar 14, 2024
1 parent aaf4657 commit e6379de
Show file tree
Hide file tree
Showing 8 changed files with 299 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ import org.opensearch.core.xcontent.XContentParser
import org.opensearch.env.Environment
import org.opensearch.env.NodeEnvironment
import org.opensearch.index.IndexModule
import org.opensearch.monitor.jvm.JvmStats
import org.opensearch.painless.spi.PainlessExtension
import org.opensearch.painless.spi.Whitelist
import org.opensearch.painless.spi.WhitelistLoader
Expand Down Expand Up @@ -268,6 +269,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
.registerTriggerService(TriggerService(scriptService))
.registerAlertService(AlertService(client, xContentRegistry, alertIndices))
.registerDocLevelMonitorQueries(DocLevelMonitorQueries(client, clusterService))
.registerJvmStats(JvmStats.jvmStats())
.registerWorkflowService(WorkflowService(client, xContentRegistry))
.registerConsumers()
.registerDestinationSettings()
Expand Down Expand Up @@ -325,6 +327,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
AlertingSettings.ALERT_HISTORY_MAX_DOCS,
AlertingSettings.ALERT_HISTORY_RETENTION_PERIOD,
AlertingSettings.ALERTING_MAX_MONITORS,
AlertingSettings.PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT,
AlertingSettings.PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY,
AlertingSettings.REQUEST_TIMEOUT,
AlertingSettings.MAX_ACTION_THROTTLE_VALUE,
AlertingSettings.FILTER_BY_BACKEND_ROLES,
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.Settings
import org.opensearch.common.unit.TimeValue
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.monitor.jvm.JvmStats
import org.opensearch.script.ScriptService
import org.opensearch.threadpool.ThreadPool

Expand All @@ -36,6 +37,7 @@ data class MonitorRunnerExecutionContext(
var alertService: AlertService? = null,
var docLevelMonitorQueries: DocLevelMonitorQueries? = null,
var workflowService: WorkflowService? = null,
var jvmStats: JvmStats? = null,

@Volatile var retryPolicy: BackoffPolicy? = null,
@Volatile var moveAlertsRetryPolicy: BackoffPolicy? = null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import org.opensearch.commons.alerting.model.action.Action
import org.opensearch.commons.alerting.util.isBucketLevelMonitor
import org.opensearch.core.action.ActionListener
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.monitor.jvm.JvmStats
import org.opensearch.script.Script
import org.opensearch.script.ScriptService
import org.opensearch.script.TemplateScript
Expand Down Expand Up @@ -134,6 +135,11 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
return this
}

fun registerJvmStats(jvmStats: JvmStats): MonitorRunnerService {
this.monitorCtx.jvmStats = jvmStats
return this
}

// Must be called after registerClusterService and registerSettings in AlertingPlugin
fun registerConsumers(): MonitorRunnerService {
monitorCtx.retryPolicy = BackoffPolicy.constantBackoff(
Expand Down Expand Up @@ -258,11 +264,19 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
when (job) {
is Workflow -> {
launch {
logger.debug(
"PERF_DEBUG: executing workflow ${job.id} on node " +
monitorCtx.clusterService!!.state().nodes().localNode.id
)
runJob(job, periodStart, periodEnd, false)
}
}
is Monitor -> {
launch {
logger.debug(
"PERF_DEBUG: executing ${job.monitorType} ${job.id} on node " +
monitorCtx.clusterService!!.state().nodes().localNode.id
)
runJob(job, periodStart, periodEnd, false)
}
}
Expand Down Expand Up @@ -307,7 +321,7 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
val runResult = if (monitor.isBucketLevelMonitor()) {
BucketLevelMonitorRunner.runMonitor(monitor, monitorCtx, periodStart, periodEnd, dryrun, executionId = executionId)
} else if (monitor.isDocLevelMonitor()) {
DocumentLevelMonitorRunner.runMonitor(monitor, monitorCtx, periodStart, periodEnd, dryrun, executionId = executionId)
DocumentLevelMonitorRunner().runMonitor(monitor, monitorCtx, periodStart, periodEnd, dryrun, executionId = executionId)
} else {
QueryLevelMonitorRunner.runMonitor(monitor, monitorCtx, periodStart, periodEnd, dryrun, executionId = executionId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,29 @@ class AlertingSettings {
Setting.Property.Dynamic
)

/** Defines the threshold percentage of heap size in bytes till which we accumulate docs in memory before we query against percolate query
* index in document level monitor execution.
*/
val PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT = Setting.intSetting(
"plugins.alerting.monitor.percolate_query_docs_size_memory_percentage_limit",
10,
0,
100,
Setting.Property.NodeScope, Setting.Property.Dynamic
)

/** Defines the threshold of the maximum number of docs accumulated in memory to query against percolate query index in document
* level monitor execution. The docs are being collected from searching on shards of indices mentioned in the
* monitor input indices field. When the number of in-memory docs reaches or exceeds threshold we immediately perform percolate
* query with the current set of docs and clear the cache and repeat the process till we have queried all indices in current
* execution
*/
val PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY = Setting.intSetting(
"plugins.alerting.monitor.percolate_query_max_num_docs_in_memory",
300000, 1000,
Setting.Property.NodeScope, Setting.Property.Dynamic
)

val INPUT_TIMEOUT = Setting.positiveTimeSetting(
"plugins.alerting.input_timeout",
LegacyOpenDistroAlertingSettings.INPUT_TIMEOUT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ object CompositeWorkflowRunner : WorkflowRunner() {
executionId
)
} else if (delegateMonitor.isDocLevelMonitor()) {
return DocumentLevelMonitorRunner.runMonitor(
return DocumentLevelMonitorRunner().runMonitor(
delegateMonitor,
monitorCtx,
periodStart,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@
"type" : "keyword"
}
}
},
"query_field_names": {
"type": "keyword"
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
"test_field" : "us-west-2"
}"""

val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = testQueryName, fields = listOf())
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = testQueryName)
val docLevelInput = DocLevelMonitorInput("description", listOf("$testIndexPrefix*"), listOf(docQuery))

val trigger = randomDocumentLevelTrigger(condition = Script("query[name=$testQueryName]"))
Expand Down

0 comments on commit e6379de

Please sign in to comment.