Skip to content

Commit

Permalink
add distributed locking to jobs in alerting (#1542)
Browse files Browse the repository at this point in the history
Signed-off-by: Subhobrata Dey <[email protected]>
Co-authored-by: opensearch-trigger-bot[bot] <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com>
  • Loading branch information
sbcd90 and opensearch-trigger-bot[bot] authored May 2, 2024
1 parent 5d4750e commit ca70598
Show file tree
Hide file tree
Showing 10 changed files with 656 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.opensearch.alerting.core.JobSweeper
import org.opensearch.alerting.core.ScheduledJobIndices
import org.opensearch.alerting.core.action.node.ScheduledJobsStatsAction
import org.opensearch.alerting.core.action.node.ScheduledJobsStatsTransportAction
import org.opensearch.alerting.core.lock.LockService
import org.opensearch.alerting.core.resthandler.RestScheduledJobStatsHandler
import org.opensearch.alerting.core.schedule.JobScheduler
import org.opensearch.alerting.core.settings.LegacyOpenDistroScheduledJobSettings
Expand Down Expand Up @@ -259,6 +260,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
): Collection<Any> {
// Need to figure out how to use the OpenSearch DI classes rather than handwiring things here.
val settings = environment.settings()
val lockService = LockService(client, clusterService)
alertIndices = AlertIndices(settings, client, threadPool, clusterService)
val alertService = AlertService(client, xContentRegistry, alertIndices)
val triggerService = TriggerService(scriptService)
Expand All @@ -277,6 +279,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
.registerDocLevelMonitorQueries(DocLevelMonitorQueries(client, clusterService))
.registerJvmStats(JvmStats.jvmStats())
.registerWorkflowService(WorkflowService(client, xContentRegistry))
.registerLockService(lockService)
.registerConsumers()
.registerDestinationSettings()
scheduledJobIndices = ScheduledJobIndices(client.admin(), clusterService)
Expand All @@ -301,7 +304,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
settings
)

DeleteMonitorService.initialize(client)
DeleteMonitorService.initialize(client, lockService)

return listOf(
sweeper,
Expand All @@ -311,7 +314,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
docLevelMonitorQueries,
destinationMigrationCoordinator,
alertService,
triggerService
triggerService,
lockService
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package org.opensearch.alerting

import org.opensearch.action.bulk.BackoffPolicy
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.core.lock.LockService
import org.opensearch.alerting.model.destination.DestinationContextFactory
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.settings.DestinationSettings
Expand Down Expand Up @@ -60,5 +61,6 @@ data class MonitorRunnerExecutionContext(
AlertingSettings.DEFAULT_PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT,
@Volatile var docLevelMonitorShardFetchSize: Int =
AlertingSettings.DEFAULT_DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE,
@Volatile var totalNodesFanOut: Int = AlertingSettings.DEFAULT_FAN_OUT_NODES
@Volatile var totalNodesFanOut: Int = AlertingSettings.DEFAULT_FAN_OUT_NODES,
@Volatile var lockService: LockService? = null
)
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.alerts.AlertMover.Companion.moveAlerts
import org.opensearch.alerting.core.JobRunner
import org.opensearch.alerting.core.ScheduledJobIndices
import org.opensearch.alerting.core.lock.LockModel
import org.opensearch.alerting.core.lock.LockService
import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.model.WorkflowRunResult
import org.opensearch.alerting.model.destination.DestinationContextFactory
Expand Down Expand Up @@ -242,6 +244,11 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
return this
}

fun registerLockService(lockService: LockService): MonitorRunnerService {
monitorCtx.lockService = lockService
return this
}

// Updates destination settings when the reload API is called so that new keystore values are visible
fun reloadDestinationSettings(settings: Settings) {
monitorCtx.destinationSettings = loadDestinationSettings(settings)
Expand Down Expand Up @@ -313,36 +320,64 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
when (job) {
is Workflow -> {
launch {
monitorCtx.client!!.suspendUntil<Client, ExecuteWorkflowResponse> {
monitorCtx.client!!.execute(
ExecuteWorkflowAction.INSTANCE,
ExecuteWorkflowRequest(
false,
TimeValue(periodEnd.toEpochMilli()),
job.id,
job,
TimeValue(periodStart.toEpochMilli())
),
it
var lock: LockModel? = null
try {
lock = monitorCtx.client!!.suspendUntil<Client, LockModel?> {
monitorCtx.lockService!!.acquireLock(job, it)
} ?: return@launch
logger.debug("lock ${lock!!.lockId} acquired")
logger.debug(
"PERF_DEBUG: executing workflow ${job.id} on node " +
monitorCtx.clusterService!!.state().nodes().localNode.id
)
monitorCtx.client!!.suspendUntil<Client, ExecuteWorkflowResponse> {
monitorCtx.client!!.execute(
ExecuteWorkflowAction.INSTANCE,
ExecuteWorkflowRequest(
false,
TimeValue(periodEnd.toEpochMilli()),
job.id,
job,
TimeValue(periodStart.toEpochMilli())
),
it
)
}
} finally {
monitorCtx.client!!.suspendUntil<Client, Boolean> { monitorCtx.lockService!!.release(lock, it) }
logger.debug("lock ${lock!!.lockId} released")
}
}
}
is Monitor -> {
launch {
val executeMonitorRequest = ExecuteMonitorRequest(
false,
TimeValue(periodEnd.toEpochMilli()),
job.id,
job,
TimeValue(periodStart.toEpochMilli())
)
monitorCtx.client!!.suspendUntil<Client, ExecuteMonitorResponse> {
monitorCtx.client!!.execute(
ExecuteMonitorAction.INSTANCE,
executeMonitorRequest,
it
var lock: LockModel? = null
try {
lock = monitorCtx.client!!.suspendUntil<Client, LockModel?> {
monitorCtx.lockService!!.acquireLock(job, it)
} ?: return@launch
logger.debug("lock ${lock!!.lockId} acquired")
logger.debug(
"PERF_DEBUG: executing ${job.monitorType} ${job.id} on node " +
monitorCtx.clusterService!!.state().nodes().localNode.id
)
val executeMonitorRequest = ExecuteMonitorRequest(
false,
TimeValue(periodEnd.toEpochMilli()),
job.id,
job,
TimeValue(periodStart.toEpochMilli())
)
monitorCtx.client!!.suspendUntil<Client, ExecuteMonitorResponse> {
monitorCtx.client!!.execute(
ExecuteMonitorAction.INSTANCE,
executeMonitorRequest,
it
)
}
} finally {
monitorCtx.client!!.suspendUntil<Client, Boolean> { monitorCtx.lockService!!.release(lock, it) }
logger.debug("lock ${lock!!.lockId} released")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import org.opensearch.action.support.IndicesOptions
import org.opensearch.action.support.WriteRequest.RefreshPolicy
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.alerting.MonitorMetadataService
import org.opensearch.alerting.core.lock.LockModel
import org.opensearch.alerting.core.lock.LockService
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.ScheduledJobUtils.Companion.WORKFLOW_DELEGATE_PATH
Expand All @@ -48,11 +50,14 @@ object DeleteMonitorService :
private val log = LogManager.getLogger(this.javaClass)

private lateinit var client: Client
private lateinit var lockService: LockService

fun initialize(
client: Client,
lockService: LockService
) {
DeleteMonitorService.client = client
DeleteMonitorService.lockService = lockService
}

/**
Expand All @@ -64,6 +69,7 @@ object DeleteMonitorService :
val deleteResponse = deleteMonitor(monitor.id, refreshPolicy)
deleteDocLevelMonitorQueriesAndIndices(monitor)
deleteMetadata(monitor)
deleteLock(monitor)
return DeleteMonitorResponse(deleteResponse.id, deleteResponse.version)
}

Expand Down Expand Up @@ -147,6 +153,10 @@ object DeleteMonitorService :
}
}

private suspend fun deleteLock(monitor: Monitor) {
client.suspendUntil<Client, Boolean> { lockService.deleteLock(LockModel.generateLockId(monitor.id), it) }
}

/**
* Checks if the monitor is part of the workflow
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.HandledTransportAction
import org.opensearch.action.support.WriteRequest.RefreshPolicy
import org.opensearch.alerting.core.lock.LockModel
import org.opensearch.alerting.core.lock.LockService
import org.opensearch.alerting.model.MonitorMetadata
import org.opensearch.alerting.model.WorkflowMetadata
import org.opensearch.alerting.opensearchapi.addFilter
Expand Down Expand Up @@ -73,6 +75,7 @@ class TransportDeleteWorkflowAction @Inject constructor(
val clusterService: ClusterService,
val settings: Settings,
val xContentRegistry: NamedXContentRegistry,
val lockService: LockService
) : HandledTransportAction<ActionRequest, DeleteWorkflowResponse>(
AlertingActions.DELETE_WORKFLOW_ACTION_NAME, transportService, actionFilters, ::DeleteWorkflowRequest
),
Expand Down Expand Up @@ -180,6 +183,12 @@ class TransportDeleteWorkflowAction @Inject constructor(
} catch (t: Exception) {
log.error("Failed to delete delegate monitor metadata. But proceeding with workflow deletion $workflowId", t)
}
try {
// Delete the workflow lock
client.suspendUntil<Client, Boolean> { lockService.deleteLock(LockModel.generateLockId(workflowId), it) }
} catch (t: Exception) {
log.error("Failed to delete workflow lock for $workflowId")
}
actionListener.onResponse(deleteWorkflowResponse)
} else {
actionListener.onFailure(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import org.apache.http.entity.StringEntity
import org.opensearch.action.search.SearchResponse
import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_ALERT_INDEX_PATTERN
import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_FINDING_INDEX_PATTERN
import org.opensearch.alerting.core.lock.LockService
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.client.Response
import org.opensearch.client.ResponseException
Expand Down Expand Up @@ -473,6 +474,86 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("1"))
}

fun `test monitor run generates no error alerts with versionconflictengineexception with locks`() {
val testIndex = createTestIndex()
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery))

val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val monitor = createMonitor(
randomDocumentLevelMonitor(
name = "__lag-monitor-test__",
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
schedule = IntervalSchedule(interval = 1, unit = ChronoUnit.MINUTES)
)
)
assertNotNull(monitor.id)

indexDoc(testIndex, "1", testDoc)
indexDoc(testIndex, "5", testDoc)
Thread.sleep(240000)

val inputMap = HashMap<String, Any>()
inputMap["searchString"] = monitor.name

val responseMap = getAlerts(inputMap).asMap()
val alerts = (responseMap["alerts"] as ArrayList<Map<String, Any>>)
alerts.forEach {
assertTrue(it["error_message"] == null)
}
}

@AwaitsFix(bugUrl = "")
fun `test monitor run generate lock and monitor delete removes lock`() {
val testIndex = createTestIndex()
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery))

val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val monitor = createMonitor(
randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
schedule = IntervalSchedule(interval = 1, unit = ChronoUnit.MINUTES)
)
)
assertNotNull(monitor.id)

indexDoc(testIndex, "1", testDoc)
indexDoc(testIndex, "5", testDoc)
OpenSearchTestCase.waitUntil({
val response = client().makeRequest("HEAD", LockService.LOCK_INDEX_NAME)
return@waitUntil (response.restStatus().status == 200)
}, 240, TimeUnit.SECONDS)

var response = client().makeRequest("GET", LockService.LOCK_INDEX_NAME + "/_search")
var responseMap = entityAsMap(response)
var noOfLocks = ((responseMap["hits"] as Map<String, Any>)["hits"] as List<Any>).size
assertEquals(1, noOfLocks)

deleteMonitor(monitor)
refreshIndex(LockService.LOCK_INDEX_NAME)
response = client().makeRequest("GET", LockService.LOCK_INDEX_NAME + "/_search")
responseMap = entityAsMap(response)
noOfLocks = ((responseMap["hits"] as Map<String, Any>)["hits"] as List<Any>).size
assertEquals(0, noOfLocks)
}

fun `test execute monitor with tag as trigger condition generates alerts and findings`() {
val testIndex = createTestIndex()
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.test.junit.annotations.TestLogging
import java.time.Instant
import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter
import java.time.temporal.ChronoUnit
import java.util.Collections
import java.util.Locale
Expand Down Expand Up @@ -1185,4 +1187,45 @@ class WorkflowRestApiIT : AlertingRestTestCase() {
val findings = searchFindings(monitor.copy(id = monitorResponse.id))
assertEquals("Findings saved for test monitor", 1, findings.size)
}

fun `test workflow run generates no error alerts with versionconflictengineexception with locks`() {
val testIndex = createTestIndex()
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery))

val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val monitor = createMonitor(
randomDocumentLevelMonitor(
name = "__lag-monitor-test__",
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
enabled = false,
schedule = IntervalSchedule(interval = 1, unit = ChronoUnit.MINUTES)
)
)
assertNotNull(monitor.id)
createWorkflow(
randomWorkflow(
monitorIds = listOf(monitor.id),
enabled = true,
schedule = IntervalSchedule(1, ChronoUnit.MINUTES)
)
)

indexDoc(testIndex, "1", testDoc)
indexDoc(testIndex, "5", testDoc)
Thread.sleep(240000)

val alerts = searchAlerts(monitor)
alerts.forEach {
assertTrue(it.errorMessage == null)
}
}
}
Loading

0 comments on commit ca70598

Please sign in to comment.