diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt index aa3422c57..0813a6d2f 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt @@ -26,10 +26,12 @@ import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext import org.opensearch.alerting.util.defaultToPerExecutionAction import org.opensearch.alerting.util.getActionExecutionPolicy import org.opensearch.alerting.util.getBucketKeysHash +import org.opensearch.alerting.util.getCancelAfterTimeInterval import org.opensearch.alerting.util.getCombinedTriggerRunResult import org.opensearch.alerting.util.printsSampleDocData import org.opensearch.alerting.workflow.WorkflowRunContext import org.opensearch.client.Client +import org.opensearch.common.unit.TimeValue import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.alerting.model.Alert @@ -453,6 +455,9 @@ object BucketLevelMonitorRunner : MonitorRunner() { queryBuilder.filter(QueryBuilders.termsQuery(fieldName, bucketValues)) sr.source().query(queryBuilder) } + sr.cancelAfterTimeInterval = TimeValue.timeValueMinutes( + getCancelAfterTimeInterval() + ) val searchResponse: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(sr, it) } return createFindingPerIndex(searchResponse, monitor, monitorCtx, shouldCreateFinding, executionId) } else { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 7262b9260..51a32b642 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -33,6 +33,7 @@ import org.opensearch.alerting.util.AlertingException import org.opensearch.alerting.util.IndexUtils import org.opensearch.alerting.util.defaultToPerExecutionAction import org.opensearch.alerting.util.getActionExecutionPolicy +import org.opensearch.alerting.util.getCancelAfterTimeInterval import org.opensearch.alerting.util.parseSampleDocTags import org.opensearch.alerting.util.printsSampleDocData import org.opensearch.alerting.workflow.WorkflowRunContext @@ -41,6 +42,7 @@ import org.opensearch.cluster.metadata.IndexMetadata import org.opensearch.cluster.routing.Preference import org.opensearch.cluster.routing.ShardRouting import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.unit.TimeValue import org.opensearch.common.xcontent.XContentFactory import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.alerting.AlertingPluginInterface @@ -116,7 +118,6 @@ class DocumentLevelMonitorRunner : MonitorRunner() { logger.error("Error setting up alerts and findings indices for monitor: $id", e) monitorResult = monitorResult.copy(error = AlertingException.wrap(e)) } - try { validate(monitor) } catch (e: Exception) { @@ -881,7 +882,9 @@ class DocumentLevelMonitorRunner : MonitorRunner() { .size(monitorCtx.docLevelMonitorShardFetchSize) ) .preference(Preference.PRIMARY_FIRST.type()) - + request.cancelAfterTimeInterval = TimeValue.timeValueMinutes( + getCancelAfterTimeInterval() + ) if (monitorCtx.fetchOnlyQueryFieldNames && fieldsToFetch.isNotEmpty()) { request.source().fetchSource(false) for (field in fieldsToFetch) { @@ -936,7 +939,12 @@ class DocumentLevelMonitorRunner : MonitorRunner() { "$monitorInputIndices against query index $queryIndices" ) var response: SearchResponse + try { + searchRequest.cancelAfterTimeInterval = TimeValue.timeValueMinutes( + getCancelAfterTimeInterval() + ) + response = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.execute(SearchAction.INSTANCE, searchRequest, it) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt index 307c88b3b..e4bf22c60 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt @@ -52,6 +52,7 @@ data class MonitorRunnerExecutionContext( @Volatile var maxActionableAlertCount: Long = AlertingSettings.DEFAULT_MAX_ACTIONABLE_ALERT_COUNT, @Volatile var indexTimeout: TimeValue? = null, + @Volatile var cancelAfterTimeInterval: TimeValue? = null, @Volatile var findingsIndexBatchSize: Int = AlertingSettings.DEFAULT_FINDINGS_INDEXING_BATCH_SIZE, @Volatile var fetchOnlyQueryFieldNames: Boolean = true, @Volatile var percQueryMaxNumDocsInMemory: Int = AlertingSettings.DEFAULT_PERCOLATE_QUERY_NUM_DOCS_IN_MEMORY, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt index a8f0a5f41..4e7a8783f 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt @@ -12,6 +12,7 @@ import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager import org.opensearch.action.bulk.BackoffPolicy +import org.opensearch.action.search.TransportSearchAction.SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.alerting.alerts.AlertIndices import org.opensearch.alerting.alerts.AlertMover.Companion.moveAlerts @@ -150,6 +151,9 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon ALERT_BACKOFF_MILLIS.get(monitorCtx.settings), ALERT_BACKOFF_COUNT.get(monitorCtx.settings) ) + + monitorCtx.cancelAfterTimeInterval = SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING.get(monitorCtx.settings) + monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(ALERT_BACKOFF_MILLIS, ALERT_BACKOFF_COUNT) { millis, count -> monitorCtx.retryPolicy = BackoffPolicy.constantBackoff(millis, count) } @@ -166,6 +170,9 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon monitorCtx.moveAlertsRetryPolicy = BackoffPolicy.exponentialBackoff(millis, count) } + monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING) { + monitorCtx.cancelAfterTimeInterval = it + } monitorCtx.allowList = ALLOW_LIST.get(monitorCtx.settings) monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(ALLOW_LIST) { monitorCtx.allowList = it diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetFindingsAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetFindingsAction.kt index b9457655b..968a31c76 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetFindingsAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetFindingsAction.kt @@ -136,7 +136,6 @@ class TransportGetFindingsSearchAction @Inject constructor( ) } searchSourceBuilder.query(queryBuilder).trackTotalHits(true) - client.threadPool().threadContext.stashContext().use { scope.launch { try { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt index a1c33c7b9..5c9e8f280 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt @@ -6,6 +6,8 @@ package org.opensearch.alerting.util import org.apache.logging.log4j.LogManager +import org.opensearch.alerting.AlertService +import org.opensearch.alerting.MonitorRunnerService import org.opensearch.alerting.model.AlertContext import org.opensearch.alerting.model.BucketLevelTriggerRunResult import org.opensearch.alerting.model.destination.Destination @@ -24,6 +26,7 @@ import org.opensearch.commons.alerting.model.action.ActionExecutionPolicy import org.opensearch.commons.alerting.model.action.ActionExecutionScope import org.opensearch.commons.alerting.util.isBucketLevelMonitor import org.opensearch.script.Script +import kotlin.math.max private val logger = LogManager.getLogger("AlertingUtils") @@ -149,6 +152,16 @@ fun defaultToPerExecutionAction( return false } +fun getCancelAfterTimeInterval(): Long { + // The default value for the cancelAfterTimeInterval is -1 and so, in this case + // we should ignore processing on the value + val givenInterval = MonitorRunnerService.monitorCtx.cancelAfterTimeInterval!!.minutes + if (givenInterval == -1L) { + return givenInterval + } + return max(givenInterval, AlertService.ALERTS_SEARCH_TIMEOUT.minutes) +} + /** * Mustache template supports iterating through a list using a `{{#listVariable}}{{/listVariable}}` block. * https://mustache.github.io/mustache.5.html diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/AlertServiceTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/AlertServiceTests.kt index 8583ae0db..2ba75fa8f 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/AlertServiceTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/AlertServiceTests.kt @@ -47,7 +47,6 @@ class AlertServiceTests : OpenSearchTestCase() { xContentRegistry = Mockito.mock(NamedXContentRegistry::class.java) threadPool = Mockito.mock(ThreadPool::class.java) clusterService = Mockito.mock(ClusterService::class.java) - settings = Settings.builder().build() val settingSet = hashSetOf>() settingSet.addAll(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)