Skip to content

Commit

Permalink
Bulk index findings in batches of 10000 and make it configurable
Browse files Browse the repository at this point in the history
Signed-off-by: Megha Goyal <[email protected]>
  • Loading branch information
goyamegh committed Feb 1, 2024
1 parent 28530dd commit 03b86ae
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
AlertingSettings.FINDING_HISTORY_MAX_DOCS,
AlertingSettings.FINDING_HISTORY_INDEX_MAX_AGE,
AlertingSettings.FINDING_HISTORY_ROLLOVER_PERIOD,
AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD
AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD,
AlertingSettings.FINDINGS_INDEXING_BATCH_SIZE
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.model.userErrorMessage
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDINGS_INDEXING_BATCH_SIZE
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.defaultToPerExecutionAction
Expand Down Expand Up @@ -476,6 +477,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
val findingDocPairs = mutableListOf<Pair<String, String>>()
val findings = mutableListOf<Finding>()
val indexRequests = mutableListOf<IndexRequest>()
monitorCtx.findingsIndexBatchSize = FINDINGS_INDEXING_BATCH_SIZE.get(monitorCtx.settings)
monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(FINDINGS_INDEXING_BATCH_SIZE) {
monitorCtx.findingsIndexBatchSize = it
}

docsToQueries.forEach {
val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! }
Expand All @@ -502,39 +507,55 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
.string()
logger.debug("Findings: $findingStr")

if (shouldCreateFinding) {
indexRequests += IndexRequest(monitor.dataSources.findingsIndex)
.source(findingStr, XContentType.JSON)
.id(finding.id)
.routing(finding.id)
.opType(DocWriteRequest.OpType.INDEX)
if (indexRequests.size > monitorCtx.findingsIndexBatchSize) {
// make bulk indexing call here and flush the indexRequest object
bulkIndexFindings(monitor, monitorCtx, indexRequests)
indexRequests.clear()
} else {
if (shouldCreateFinding) {
indexRequests += IndexRequest(monitor.dataSources.findingsIndex)
.source(findingStr, XContentType.JSON)
.id(finding.id)
.routing(finding.id)
.opType(DocWriteRequest.OpType.INDEX)
}
}
}

if (indexRequests.size <= monitorCtx.findingsIndexBatchSize) {
bulkIndexFindings(monitor, monitorCtx, indexRequests)
}

try {
findings.forEach { finding ->
publishFinding(monitor, monitorCtx, finding)
}
} catch (e: Exception) {
// suppress exception
logger.error("Optional finding callback failed", e)
}
return findingDocPairs
}

private suspend fun bulkIndexFindings(
monitor: Monitor,
monitorCtx: MonitorRunnerExecutionContext,
indexRequests: List<IndexRequest>
) {
if (indexRequests.isNotEmpty()) {
val bulkResponse: BulkResponse = monitorCtx.client!!.suspendUntil {
bulk(BulkRequest().add(indexRequests).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE), it)
}
if (bulkResponse.hasFailures()) {
bulkResponse.items.forEach { item ->
if (item.isFailed) {
logger.debug("Failed indexing the finding ${item.id} of monitor [${monitor.id}]")
logger.error("Failed indexing the finding ${item.id} of monitor [${monitor.id}]")
}
}
} else {
logger.debug("[${bulkResponse.items.size}] All findings successfully indexed.")
}
}

try {
findings.forEach { finding ->
publishFinding(monitor, monitorCtx, finding)
}
} catch (e: Exception) {
// suppress exception
logger.error("Optional finding callback failed", e)
}
return findingDocPairs
}

private fun publishFinding(
Expand Down Expand Up @@ -658,7 +679,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
matchingDocs.addAll(getAllDocs(hits, index, concreteIndex, monitor.id, conflictingFields))
}
} catch (e: Exception) {
logger.warn("Failed to run for shard $shard. Error: ${e.message}")
logger.error("Failed to run for shard $shard. Error: ${e.message}")
}
}
return matchingDocs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,6 @@ data class MonitorRunnerExecutionContext(
@Volatile var destinationContextFactory: DestinationContextFactory? = null,

@Volatile var maxActionableAlertCount: Long = AlertingSettings.DEFAULT_MAX_ACTIONABLE_ALERT_COUNT,
@Volatile var indexTimeout: TimeValue? = null
@Volatile var indexTimeout: TimeValue? = null,
@Volatile var findingsIndexBatchSize: Int = AlertingSettings.DEFAULT_FINDINGS_INDEXING_BATCH_SIZE
)
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class AlertingSettings {

companion object {
const val DEFAULT_MAX_ACTIONABLE_ALERT_COUNT = 50L
const val DEFAULT_FINDINGS_INDEXING_BATCH_SIZE = 10000

val ALERTING_MAX_MONITORS = Setting.intSetting(
"plugins.alerting.monitor.max_monitors",
Expand Down Expand Up @@ -153,5 +154,12 @@ class AlertingSettings {
-1L,
Setting.Property.NodeScope, Setting.Property.Dynamic
)

val FINDINGS_INDEXING_BATCH_SIZE = Setting.intSetting(
"plugins.alerting.alert_findings_indexing_batch_size",
DEFAULT_FINDINGS_INDEXING_BATCH_SIZE,
0,
Setting.Property.NodeScope, Setting.Property.Dynamic
)
}
}

0 comments on commit 03b86ae

Please sign in to comment.