Skip to content

Commit

Permalink
[Backport 2.11] fix monitor renew lock issue (#1627) (#1631)
Browse files Browse the repository at this point in the history
* fix monitor renew lock issue (#1627)

Signed-off-by: Subhobrata Dey <[email protected]>

* fix ktlint

Signed-off-by: Subhobrata Dey <[email protected]>

* fix compilation issues

Signed-off-by: Subhobrata Dey <[email protected]>

---------

Signed-off-by: Subhobrata Dey <[email protected]>
  • Loading branch information
sbcd90 authored Jul 31, 2024
1 parent ee712da commit 2507bd6
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 14 deletions.
1 change: 1 addition & 0 deletions .github/workflows/security-test-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ jobs:
if docker pull opensearchstaging/opensearch:$docker_version
then
echo "FROM opensearchstaging/opensearch:$docker_version" >> Dockerfile
echo "RUN if [ -d /usr/share/opensearch/plugins/opensearch-security-analytics ]; then /usr/share/opensearch/bin/opensearch-plugin remove opensearch-security-analytics; fi" >> Dockerfile
echo "RUN if [ -d /usr/share/opensearch/plugins/opensearch-alerting ]; then /usr/share/opensearch/bin/opensearch-plugin remove opensearch-alerting; fi" >> Dockerfile
echo "ADD alerting/build/distributions/$plugin /tmp/" >> Dockerfile
echo "RUN /usr/share/opensearch/bin/opensearch-plugin install --batch file:/tmp/$plugin" >> Dockerfile
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,12 +320,12 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
when (job) {
is Workflow -> {
launch {
var lock: LockModel? = null
var workflowLock: LockModel? = null
try {
lock = monitorCtx.client!!.suspendUntil<Client, LockModel?> {
workflowLock = monitorCtx.client!!.suspendUntil<Client, LockModel?> {
monitorCtx.lockService!!.acquireLock(job, it)
} ?: return@launch
logger.debug("lock ${lock!!.lockId} acquired")
logger.debug("lock ${workflowLock.lockId} acquired")
logger.debug(
"PERF_DEBUG: executing workflow ${job.id} on node " +
monitorCtx.clusterService!!.state().nodes().localNode.id
Expand All @@ -343,20 +343,22 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
it
)
}
} catch (e: Exception) {
logger.error("Workflow run failed for workflow with id ${job.id}", e)
} finally {
monitorCtx.client!!.suspendUntil<Client, Boolean> { monitorCtx.lockService!!.release(lock, it) }
logger.debug("lock ${lock!!.lockId} released")
monitorCtx.client!!.suspendUntil<Client, Boolean> { monitorCtx.lockService!!.release(workflowLock, it) }
logger.debug("lock ${workflowLock?.lockId} released")
}
}
}
is Monitor -> {
launch {
var lock: LockModel? = null
var monitorLock: LockModel? = null
try {
lock = monitorCtx.client!!.suspendUntil<Client, LockModel?> {
monitorLock = monitorCtx.client!!.suspendUntil<Client, LockModel?> {
monitorCtx.lockService!!.acquireLock(job, it)
} ?: return@launch
logger.debug("lock ${lock!!.lockId} acquired")
logger.debug("lock ${monitorLock.lockId} acquired")
logger.debug(
"PERF_DEBUG: executing ${job.monitorType} ${job.id} on node " +
monitorCtx.clusterService!!.state().nodes().localNode.id
Expand All @@ -375,9 +377,11 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
it
)
}
} catch (e: Exception) {
logger.error("Monitor run failed for monitor with id ${job.id}", e)
} finally {
monitorCtx.client!!.suspendUntil<Client, Boolean> { monitorCtx.lockService!!.release(lock, it) }
logger.debug("lock ${lock!!.lockId} released")
monitorCtx.client!!.suspendUntil<Client, Boolean> { monitorCtx.lockService!!.release(monitorLock, it) }
logger.debug("lock ${monitorLock?.lockId} released")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -919,7 +919,7 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
return response
}

public fun indexDoc(client: RestClient, index: String, doc: String, refresh: Boolean = true): Response {
fun indexDoc(client: RestClient, index: String, doc: String, refresh: Boolean = true): Response {
val requestBody = StringEntity(doc, APPLICATION_JSON)
val params = if (refresh) mapOf("refresh" to "true") else mapOf()
val response = client.makeRequest("POST", "$index/_doc?op_type=create", params, requestBody)
Expand All @@ -930,6 +930,17 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
return response
}

fun updateDoc(client: RestClient, index: String, id: String, doc: String, refresh: Boolean = true): Response {
val requestBody = StringEntity(doc, APPLICATION_JSON)
val params = if (refresh) mapOf("refresh" to "true") else mapOf()
val response = client.makeRequest("PUT", "$index/_doc/$id", params, requestBody)
assertTrue(
"Unable to index doc: '${doc.take(15)}...' to index: '$index'",
listOf(RestStatus.OK, RestStatus.CREATED).contains(response.restStatus())
)
return response
}

protected fun deleteDoc(index: String, id: String, refresh: Boolean = true): Response {
val params = if (refresh) mapOf("refresh" to "true") else mapOf()
val response = client().makeRequest("DELETE", "$index/_doc/$id", params)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import java.time.temporal.ChronoUnit
import java.time.temporal.ChronoUnit.MILLIS
import java.util.Locale
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean

class DocumentMonitorRunnerIT : AlertingRestTestCase() {

Expand Down Expand Up @@ -2435,4 +2436,121 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
assertTrue(e.message!!.contains("illegal_argument_exception"))
}
}

fun `test execute monitor generates alerts and findings with renewable locks`() {
val testIndex = createTestIndex()
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery))

val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val monitor = createMonitor(
randomDocumentLevelMonitor(
name = "__lag-monitor-test__",
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
schedule = IntervalSchedule(interval = 1, unit = ChronoUnit.MINUTES),
enabled = true
)
)
assertNotNull(monitor.id)
assertNotNull(monitor.id)

indexDoc(testIndex, "1", testDoc)
indexDoc(testIndex, "2", testDoc)

var found = AtomicBoolean(false)
OpenSearchTestCase.waitUntil(
{
val res = (searchFindings(monitor).size == 2)
found.set(res)
found.get()
}, 2, TimeUnit.MINUTES
)
assertEquals(found.get(), true)

updateMonitor(monitor.copy(enabled = false, enabledTime = null))

val currTimeStampMinusTenMinutes = System.currentTimeMillis() - 600000L
val lock = "{\"scheduled_job_id\":\"${monitor.id}\",\"lock_time\":${currTimeStampMinusTenMinutes / 1000},\"released\":false}"
updateDoc(client(), LockService.LOCK_INDEX_NAME, "${monitor.id}-lock", lock)

updateMonitor(monitor.copy(enabled = true), true)

indexDoc(testIndex, "4", testDoc)
indexDoc(testIndex, "5", testDoc)

found = AtomicBoolean(false)
OpenSearchTestCase.waitUntil(
{
val res = (searchFindings(monitor).size == 4)
found.set(res)
found.get()
}, 2, TimeUnit.MINUTES
)
assertEquals(found.get(), true)
assertTrue(true)
}

fun `test execute monitor generates alerts and findings with non renewable locks`() {
val testIndex = createTestIndex()
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery))

val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val monitor = createMonitor(
randomDocumentLevelMonitor(
name = "__lag-monitor-test__",
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
schedule = IntervalSchedule(interval = 1, unit = ChronoUnit.MINUTES),
enabled = true
)
)
assertNotNull(monitor.id)
assertNotNull(monitor.id)

indexDoc(testIndex, "1", testDoc)
indexDoc(testIndex, "2", testDoc)

var found = AtomicBoolean(false)
OpenSearchTestCase.waitUntil(
{
val res = (searchFindings(monitor).size == 2)
found.set(res)
found.get()
}, 2, TimeUnit.MINUTES
)
assertEquals(found.get(), true)

val currTimeStamp = System.currentTimeMillis()
val lock = "{\"scheduled_job_id\":\"${monitor.id}\",\"lock_time\":$currTimeStamp,\"released\":false}"
updateDoc(client(), LockService.LOCK_INDEX_NAME, "${monitor.id}-lock", lock)

indexDoc(testIndex, "4", testDoc)
indexDoc(testIndex, "5", testDoc)

found = AtomicBoolean(false)
OpenSearchTestCase.waitUntil(
{
val res = (searchFindings(monitor).size == 4)
found.set(res)
found.get()
}, 2, TimeUnit.MINUTES
)
assertEquals(found.get(), false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import org.opensearch.action.update.UpdateResponse
import org.opensearch.client.Client
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.Settings
import org.opensearch.common.unit.TimeValue
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentType
Expand All @@ -29,6 +30,7 @@ import org.opensearch.index.engine.VersionConflictEngineException
import org.opensearch.index.seqno.SequenceNumbers
import java.io.IOException
import java.time.Instant
import java.util.concurrent.TimeUnit

private val log = LogManager.getLogger(LockService::class.java)

Expand All @@ -37,6 +39,7 @@ class LockService(private val client: Client, private val clusterService: Cluste

companion object {
const val LOCK_INDEX_NAME = ".opensearch-alerting-config-lock"
val LOCK_EXPIRED_MINUTES = TimeValue(5, TimeUnit.MINUTES)

@JvmStatic
fun lockMapping(): String? {
Expand Down Expand Up @@ -72,13 +75,23 @@ class LockService(private val client: Client, private val clusterService: Cluste
object : ActionListener<LockModel> {
override fun onResponse(existingLock: LockModel?) {
if (existingLock != null) {
val currentTimestamp = getNow()
if (isLockReleased(existingLock)) {
log.debug("lock is released or expired: {}", existingLock)
val updateLock = LockModel(existingLock, getNow(), false)
updateLock(updateLock, listener)
} else {
log.debug("Lock is NOT released or expired. {}", existingLock)
listener.onResponse(null)
log.debug("Lock is NOT released. {}", existingLock)
if (existingLock.lockTime.epochSecond + LOCK_EXPIRED_MINUTES.seconds
< currentTimestamp.epochSecond
) {
log.debug("Lock is expired. Renewing Lock {}", existingLock)
val updateLock = LockModel(existingLock, getNow(), false)
updateLock(updateLock, listener)
} else {
log.debug("Lock is NOT expired. Not running monitor {}", existingLock)
listener.onResponse(null)
}
}
} else {
val tempLock = LockModel(scheduledJobId, getNow(), false)
Expand Down Expand Up @@ -220,7 +233,7 @@ class LockService(private val client: Client, private val clusterService: Cluste
listener: ActionListener<Boolean>
) {
if (lock == null) {
log.debug("Lock is null. Nothing to release.")
log.error("Lock is null. Nothing to release.")
listener.onResponse(false)
} else {
log.debug("Releasing lock: {}", lock)
Expand Down

0 comments on commit 2507bd6

Please sign in to comment.