Skip to content

Commit

Permalink
add distributed locking to jobs in alerting
Browse files Browse the repository at this point in the history
Signed-off-by: Subhobrata Dey <[email protected]>
  • Loading branch information
sbcd90 committed Feb 6, 2024
1 parent 3c50f7d commit e2a52df
Show file tree
Hide file tree
Showing 12 changed files with 647 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import org.opensearch.alerting.core.JobSweeper
import org.opensearch.alerting.core.ScheduledJobIndices
import org.opensearch.alerting.core.action.node.ScheduledJobsStatsAction
import org.opensearch.alerting.core.action.node.ScheduledJobsStatsTransportAction
import org.opensearch.alerting.core.lock.LockService
import org.opensearch.alerting.core.resthandler.RestScheduledJobStatsHandler
import org.opensearch.alerting.core.schedule.JobScheduler
import org.opensearch.alerting.core.settings.LegacyOpenDistroScheduledJobSettings
Expand Down Expand Up @@ -243,6 +244,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
): Collection<Any> {
// Need to figure out how to use the OpenSearch DI classes rather than handwiring things here.
val settings = environment.settings()
val lockService = LockService(client, clusterService)
alertIndices = AlertIndices(settings, client, threadPool, clusterService)
runner = MonitorRunnerService
.registerClusterService(clusterService)
Expand All @@ -258,6 +260,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
.registerAlertService(AlertService(client, xContentRegistry, alertIndices))
.registerDocLevelMonitorQueries(DocLevelMonitorQueries(client, clusterService))
.registerWorkflowService(WorkflowService(client, xContentRegistry))
.registerLockService(lockService)
.registerConsumers()
.registerDestinationSettings()
scheduledJobIndices = ScheduledJobIndices(client.admin(), clusterService)
Expand All @@ -282,9 +285,9 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
settings
)

DeleteMonitorService.initialize(client)
DeleteMonitorService.initialize(client, lockService)

return listOf(sweeper, scheduler, runner, scheduledJobIndices, docLevelMonitorQueries, destinationMigrationCoordinator)
return listOf(sweeper, scheduler, runner, scheduledJobIndices, docLevelMonitorQueries, destinationMigrationCoordinator, lockService)
}

override fun getSettings(): List<Setting<*>> {
Expand Down Expand Up @@ -318,6 +321,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
AlertingSettings.MAX_ACTION_THROTTLE_VALUE,
AlertingSettings.FILTER_BY_BACKEND_ROLES,
AlertingSettings.MAX_ACTIONABLE_ALERT_COUNT,
AlertingSettings.TEST_MONITOR_NAME,
LegacyOpenDistroAlertingSettings.INPUT_TIMEOUT,
LegacyOpenDistroAlertingSettings.INDEX_TIMEOUT,
LegacyOpenDistroAlertingSettings.BULK_TIMEOUT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.model.userErrorMessage
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.defaultToPerExecutionAction
Expand Down Expand Up @@ -255,6 +256,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
}
}
monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults)))
delayIfTempMonitor(monitor, monitorCtx)

/*
populate the map queryToDocIds with pairs of <DocLevelQuery object from queries in monitor metadata &
Expand Down Expand Up @@ -800,4 +802,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
}
jsonAsMap.putAll(tempMap)
}

private fun delayIfTempMonitor(monitor: Monitor, monitorCtx: MonitorRunnerExecutionContext) {
if (monitor.name == monitorCtx.settings!!.get(AlertingSettings.TEST_MONITOR_NAME.key)) {
Thread.sleep(80000)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package org.opensearch.alerting

import org.opensearch.action.bulk.BackoffPolicy
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.core.lock.LockService
import org.opensearch.alerting.model.destination.DestinationContextFactory
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.settings.DestinationSettings
Expand Down Expand Up @@ -47,5 +48,7 @@ data class MonitorRunnerExecutionContext(
@Volatile var destinationContextFactory: DestinationContextFactory? = null,

@Volatile var maxActionableAlertCount: Long = AlertingSettings.DEFAULT_MAX_ACTIONABLE_ALERT_COUNT,
@Volatile var indexTimeout: TimeValue? = null
@Volatile var indexTimeout: TimeValue? = null,

@Volatile var lockService: LockService? = null
)
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.alerts.AlertMover.Companion.moveAlerts
import org.opensearch.alerting.core.JobRunner
import org.opensearch.alerting.core.ScheduledJobIndices
import org.opensearch.alerting.core.lock.LockModel
import org.opensearch.alerting.core.lock.LockService
import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.model.WorkflowRunResult
import org.opensearch.alerting.model.destination.DestinationContextFactory
import org.opensearch.alerting.opensearchapi.retry
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.script.TriggerExecutionContext
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_COUNT
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_MILLIS
Expand Down Expand Up @@ -180,6 +183,11 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
return this
}

fun registerLockService(lockService: LockService): MonitorRunnerService {
monitorCtx.lockService = lockService
return this
}

// Updates destination settings when the reload API is called so that new keystore values are visible
fun reloadDestinationSettings(settings: Settings) {
monitorCtx.destinationSettings = loadDestinationSettings(settings)
Expand Down Expand Up @@ -251,12 +259,32 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
when (job) {
is Workflow -> {
launch {
runJob(job, periodStart, periodEnd, false)
var lock: LockModel? = null
try {
lock = monitorCtx.client!!.suspendUntil<Client, LockModel?> {
monitorCtx.lockService!!.acquireLock(job, it)
} ?: return@launch
logger.debug("lock ${lock!!.lockId} acquired")
runJob(job, periodStart, periodEnd, false)
} finally {
monitorCtx.client!!.suspendUntil<Client, Boolean> { monitorCtx.lockService!!.release(lock, it) }
logger.debug("lock ${lock!!.lockId} released")
}
}
}
is Monitor -> {
launch {
runJob(job, periodStart, periodEnd, false)
var lock: LockModel? = null
try {
lock = monitorCtx.client!!.suspendUntil<Client, LockModel?> {
monitorCtx.lockService!!.acquireLock(job, it)
} ?: return@launch
logger.debug("lock ${lock!!.lockId} acquired")
runJob(job, periodStart, periodEnd, false)
} finally {
monitorCtx.client!!.suspendUntil<Client, Boolean> { monitorCtx.lockService!!.release(lock, it) }
logger.debug("lock ${lock!!.lockId} released")
}
}
}
else -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import org.opensearch.action.support.IndicesOptions
import org.opensearch.action.support.WriteRequest.RefreshPolicy
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.alerting.MonitorMetadataService
import org.opensearch.alerting.core.lock.LockModel
import org.opensearch.alerting.core.lock.LockService
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.ScheduledJobUtils.Companion.WORKFLOW_DELEGATE_PATH
Expand Down Expand Up @@ -49,11 +51,14 @@ object DeleteMonitorService :
private val log = LogManager.getLogger(this.javaClass)

private lateinit var client: Client
private lateinit var lockService: LockService

fun initialize(
client: Client,
lockService: LockService
) {
DeleteMonitorService.client = client
DeleteMonitorService.lockService = lockService
}

/**
Expand All @@ -65,6 +70,7 @@ object DeleteMonitorService :
val deleteResponse = deleteMonitor(monitor.id, refreshPolicy)
deleteDocLevelMonitorQueriesAndIndices(monitor)
deleteMetadata(monitor)
deleteLock(monitor)
return DeleteMonitorResponse(deleteResponse.id, deleteResponse.version)
}

Expand Down Expand Up @@ -148,6 +154,10 @@ object DeleteMonitorService :
}
}

private suspend fun deleteLock(monitor: Monitor) {
client.suspendUntil<Client, Boolean> { lockService.deleteLock(LockModel.generateLockId(monitor.id), it) }
}

/**
* Checks if the monitor is part of the workflow
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,5 +153,11 @@ class AlertingSettings {
-1L,
Setting.Property.NodeScope, Setting.Property.Dynamic
)

val TEST_MONITOR_NAME = Setting.simpleString(
"plugins.alerting.integ_test.test_monitor_name",
"__lag-monitor-test__",
Setting.Property.NodeScope, Setting.Property.Dynamic
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.HandledTransportAction
import org.opensearch.action.support.WriteRequest.RefreshPolicy
import org.opensearch.alerting.core.lock.LockModel
import org.opensearch.alerting.core.lock.LockService
import org.opensearch.alerting.model.MonitorMetadata
import org.opensearch.alerting.model.WorkflowMetadata
import org.opensearch.alerting.opensearchapi.addFilter
Expand Down Expand Up @@ -73,6 +75,7 @@ class TransportDeleteWorkflowAction @Inject constructor(
val clusterService: ClusterService,
val settings: Settings,
val xContentRegistry: NamedXContentRegistry,
val lockService: LockService
) : HandledTransportAction<ActionRequest, DeleteWorkflowResponse>(
AlertingActions.DELETE_WORKFLOW_ACTION_NAME, transportService, actionFilters, ::DeleteWorkflowRequest
),
Expand Down Expand Up @@ -180,6 +183,12 @@ class TransportDeleteWorkflowAction @Inject constructor(
} catch (t: Exception) {
log.error("Failed to delete delegate monitor metadata. But proceeding with workflow deletion $workflowId", t)
}
try {
// Delete the workflow lock
client.suspendUntil<Client, Boolean> { lockService.deleteLock(LockModel.generateLockId(workflowId), it) }
} catch (t: Exception) {
log.error("Failed to delete workflow lock for $workflowId")
}
actionListener.onResponse(deleteWorkflowResponse)
} else {
actionListener.onFailure(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,28 @@ import org.apache.hc.core5.http.io.entity.StringEntity
import org.opensearch.action.search.SearchResponse
import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_ALERT_INDEX_PATTERN
import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_FINDING_INDEX_PATTERN
import org.opensearch.alerting.core.lock.LockService
import org.opensearch.client.Response
import org.opensearch.client.ResponseException
import org.opensearch.common.xcontent.json.JsonXContent
import org.opensearch.commons.alerting.model.Alert
import org.opensearch.commons.alerting.model.DataSources
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
import org.opensearch.commons.alerting.model.DocLevelQuery
import org.opensearch.commons.alerting.model.IntervalSchedule
import org.opensearch.commons.alerting.model.action.ActionExecutionPolicy
import org.opensearch.commons.alerting.model.action.AlertCategory
import org.opensearch.commons.alerting.model.action.PerAlertActionScope
import org.opensearch.commons.alerting.model.action.PerExecutionActionScope
import org.opensearch.core.rest.RestStatus
import org.opensearch.script.Script
import org.opensearch.test.OpenSearchTestCase
import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter
import java.time.temporal.ChronoUnit
import java.time.temporal.ChronoUnit.MILLIS
import java.util.Locale
import java.util.concurrent.TimeUnit

class DocumentMonitorRunnerIT : AlertingRestTestCase() {

Expand Down Expand Up @@ -147,6 +152,86 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("5"))
}

fun `test monitor run generates no error alerts with versionconflictengineexception with locks`() {
val testIndex = createTestIndex()
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf())
val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery))

val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val monitor = createMonitor(
randomDocumentLevelMonitor(
name = "__lag-monitor-test__",
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
schedule = IntervalSchedule(interval = 1, unit = ChronoUnit.MINUTES)
)
)
assertNotNull(monitor.id)

indexDoc(testIndex, "1", testDoc)
indexDoc(testIndex, "5", testDoc)
Thread.sleep(240000)

val inputMap = HashMap<String, Any>()
inputMap["searchString"] = monitor.name

val responseMap = getAlerts(inputMap).asMap()
val alerts = (responseMap["alerts"] as ArrayList<Map<String, Any>>)
alerts.forEach {
assertTrue(it["error_message"] == null)
}
}

@AwaitsFix(bugUrl = "")
fun `test monitor run generate lock and monitor delete removes lock`() {
val testIndex = createTestIndex()
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf())
val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery))

val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val monitor = createMonitor(
randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
schedule = IntervalSchedule(interval = 1, unit = ChronoUnit.MINUTES)
)
)
assertNotNull(monitor.id)

indexDoc(testIndex, "1", testDoc)
indexDoc(testIndex, "5", testDoc)
OpenSearchTestCase.waitUntil({
val response = client().makeRequest("HEAD", LockService.LOCK_INDEX_NAME)
return@waitUntil (response.restStatus().status == 200)
}, 240, TimeUnit.SECONDS)

var response = client().makeRequest("GET", LockService.LOCK_INDEX_NAME + "/_search")
var responseMap = entityAsMap(response)
var noOfLocks = ((responseMap["hits"] as Map<String, Any>)["hits"] as List<Any>).size
assertEquals(1, noOfLocks)

deleteMonitor(monitor)
refreshIndex(LockService.LOCK_INDEX_NAME)
response = client().makeRequest("GET", LockService.LOCK_INDEX_NAME + "/_search")
responseMap = entityAsMap(response)
noOfLocks = ((responseMap["hits"] as Map<String, Any>)["hits"] as List<Any>).size
assertEquals(0, noOfLocks)
}

fun `test execute monitor with tag as trigger condition generates alerts and findings`() {
val testIndex = createTestIndex()
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
Expand Down
Loading

0 comments on commit e2a52df

Please sign in to comment.