Skip to content

Commit

Permalink
Addressing review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
goyamegh committed Feb 5, 2024
1 parent 03b86ae commit 1db2c00
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() {

val actionCtx = triggerCtx.copy(
triggeredDocs = triggerResult.triggeredDocs,
// confirm if this is right or only trigger-able findings should be present in this list
relatedFindings = findingToDocPairs.map { it.first },
error = monitorResult.error ?: triggerResult.error
)
Expand Down Expand Up @@ -477,10 +476,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
val findingDocPairs = mutableListOf<Pair<String, String>>()
val findings = mutableListOf<Finding>()
val indexRequests = mutableListOf<IndexRequest>()
monitorCtx.findingsIndexBatchSize = FINDINGS_INDEXING_BATCH_SIZE.get(monitorCtx.settings)
monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(FINDINGS_INDEXING_BATCH_SIZE) {
monitorCtx.findingsIndexBatchSize = it
}

docsToQueries.forEach {
val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! }
Expand All @@ -507,22 +502,15 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
.string()
logger.debug("Findings: $findingStr")

if (indexRequests.size > monitorCtx.findingsIndexBatchSize) {
// make bulk indexing call here and flush the indexRequest object
bulkIndexFindings(monitor, monitorCtx, indexRequests)
indexRequests.clear()
} else {
if (shouldCreateFinding) {
indexRequests += IndexRequest(monitor.dataSources.findingsIndex)
.source(findingStr, XContentType.JSON)
.id(finding.id)
.routing(finding.id)
.opType(DocWriteRequest.OpType.INDEX)
}
if (shouldCreateFinding) {
indexRequests += IndexRequest(monitor.dataSources.findingsIndex)
.source(findingStr, XContentType.JSON)
.id(finding.id)
.opType(DocWriteRequest.OpType.CREATE)
}
}

if (indexRequests.size <= monitorCtx.findingsIndexBatchSize) {
if (indexRequests.isNotEmpty()) {
bulkIndexFindings(monitor, monitorCtx, indexRequests)
}

Expand All @@ -542,7 +530,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
monitorCtx: MonitorRunnerExecutionContext,
indexRequests: List<IndexRequest>
) {
if (indexRequests.isNotEmpty()) {
monitorCtx.findingsIndexBatchSize = FINDINGS_INDEXING_BATCH_SIZE.get(monitorCtx.settings)

indexRequests.chunked(monitorCtx.findingsIndexBatchSize).forEach { batch ->
val bulkResponse: BulkResponse = monitorCtx.client!!.suspendUntil {
bulk(BulkRequest().add(indexRequests).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE), it)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.opensearch.alerting.model.WorkflowRunResult
import org.opensearch.alerting.model.destination.DestinationContextFactory
import org.opensearch.alerting.opensearchapi.retry
import org.opensearch.alerting.script.TriggerExecutionContext
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_COUNT
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_MILLIS
import org.opensearch.alerting.settings.AlertingSettings.Companion.INDEX_TIMEOUT
Expand Down Expand Up @@ -169,6 +170,10 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon

monitorCtx.indexTimeout = INDEX_TIMEOUT.get(monitorCtx.settings)

monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(AlertingSettings.FINDINGS_INDEXING_BATCH_SIZE) {
monitorCtx.findingsIndexBatchSize = it
}

return this
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class AlertingSettings {

companion object {
const val DEFAULT_MAX_ACTIONABLE_ALERT_COUNT = 50L
const val DEFAULT_FINDINGS_INDEXING_BATCH_SIZE = 10000
const val DEFAULT_FINDINGS_INDEXING_BATCH_SIZE = 1000

val ALERTING_MAX_MONITORS = Setting.intSetting(
"plugins.alerting.monitor.max_monitors",
Expand Down Expand Up @@ -158,7 +158,7 @@ class AlertingSettings {
val FINDINGS_INDEXING_BATCH_SIZE = Setting.intSetting(
"plugins.alerting.alert_findings_indexing_batch_size",
DEFAULT_FINDINGS_INDEXING_BATCH_SIZE,
0,
1,
Setting.Property.NodeScope, Setting.Property.Dynamic
)
}
Expand Down

0 comments on commit 1db2c00

Please sign in to comment.