Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Subhobrata Dey <[email protected]>
  • Loading branch information
sbcd90 committed Jul 30, 2024
1 parent 93267b3 commit f84ef1c
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,8 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
it
)
}
} catch (e: Exception) {
logger.error("Workflow run failed for workflow with id ${job.id}", e)
} finally {
monitorCtx.client!!.suspendUntil<Client, Boolean> { monitorCtx.lockService!!.release(workflowLock, it) }
logger.debug("lock ${workflowLock?.lockId} released")
Expand Down Expand Up @@ -383,6 +385,8 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
it
)
}
} catch (e: Exception) {
logger.error("Monitor run failed for monitor with id ${job.id}", e)
} finally {
monitorCtx.client!!.suspendUntil<Client, Boolean> { monitorCtx.lockService!!.release(monitorLock, it) }
logger.debug("lock ${monitorLock?.lockId} released")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -940,7 +940,7 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
return response
}

public fun indexDoc(client: RestClient, index: String, doc: String, refresh: Boolean = true): Response {
fun indexDoc(client: RestClient, index: String, doc: String, refresh: Boolean = true): Response {
val requestBody = StringEntity(doc, APPLICATION_JSON)
val params = if (refresh) mapOf("refresh" to "true") else mapOf()
val response = client.makeRequest("POST", "$index/_doc?op_type=create", params, requestBody)
Expand All @@ -951,6 +951,17 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
return response
}

fun updateDoc(client: RestClient, index: String, id: String, doc: String, refresh: Boolean = true): Response {
val requestBody = StringEntity(doc, APPLICATION_JSON)
val params = if (refresh) mapOf("refresh" to "true") else mapOf()
val response = client.makeRequest("PUT", "$index/_doc/$id", params, requestBody)
assertTrue(
"Unable to index doc: '${doc.take(15)}...' to index: '$index'",
listOf(RestStatus.OK, RestStatus.CREATED).contains(response.restStatus())
)
return response
}

protected fun deleteDoc(index: String, id: String, refresh: Boolean = true): Response {
val params = if (refresh) mapOf("refresh" to "true") else mapOf()
val response = client().makeRequest("DELETE", "$index/_doc/$id", params)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import java.time.temporal.ChronoUnit
import java.time.temporal.ChronoUnit.MILLIS
import java.util.Locale
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean

class DocumentMonitorRunnerIT : AlertingRestTestCase() {

Expand Down Expand Up @@ -2748,4 +2749,121 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {

deleteDataStream(aliasName)
}

fun `test execute monitor generates alerts and findings with renewable locks`() {
val testIndex = createTestIndex()
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf())
val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery))

val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val monitor = createMonitor(
randomDocumentLevelMonitor(
name = "__lag-monitor-test__",
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
schedule = IntervalSchedule(interval = 1, unit = ChronoUnit.MINUTES),
enabled = true
)
)
assertNotNull(monitor.id)
assertNotNull(monitor.id)

indexDoc(testIndex, "1", testDoc)
indexDoc(testIndex, "2", testDoc)

var found = AtomicBoolean(false)
OpenSearchTestCase.waitUntil(
{
val res = (searchFindings(monitor).size == 2)
found.set(res)
found.get()
}, 2, TimeUnit.MINUTES
)
assertEquals(found.get(), true)

updateMonitor(monitor.copy(enabled = false, enabledTime = null))

val currTimeStampMinusTenMinutes = System.currentTimeMillis() - 600000L
val lock = "{\"scheduled_job_id\":\"${monitor.id}\",\"lock_time\":${currTimeStampMinusTenMinutes / 1000},\"released\":false}"
updateDoc(client(), LockService.LOCK_INDEX_NAME, "${monitor.id}-lock", lock)

updateMonitor(monitor.copy(enabled = true), true)

indexDoc(testIndex, "4", testDoc)
indexDoc(testIndex, "5", testDoc)

found = AtomicBoolean(false)
OpenSearchTestCase.waitUntil(
{
val res = (searchFindings(monitor).size == 4)
found.set(res)
found.get()
}, 2, TimeUnit.MINUTES
)
assertEquals(found.get(), true)
assertTrue(true)
}

fun `test execute monitor generates alerts and findings with non renewable locks`() {
val testIndex = createTestIndex()
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf())
val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery))

val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val monitor = createMonitor(
randomDocumentLevelMonitor(
name = "__lag-monitor-test__",
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
schedule = IntervalSchedule(interval = 1, unit = ChronoUnit.MINUTES),
enabled = true
)
)
assertNotNull(monitor.id)
assertNotNull(monitor.id)

indexDoc(testIndex, "1", testDoc)
indexDoc(testIndex, "2", testDoc)

var found = AtomicBoolean(false)
OpenSearchTestCase.waitUntil(
{
val res = (searchFindings(monitor).size == 2)
found.set(res)
found.get()
}, 2, TimeUnit.MINUTES
)
assertEquals(found.get(), true)

val currTimeStamp = System.currentTimeMillis()
val lock = "{\"scheduled_job_id\":\"${monitor.id}\",\"lock_time\":$currTimeStamp,\"released\":false}"
updateDoc(client(), LockService.LOCK_INDEX_NAME, "${monitor.id}-lock", lock)

indexDoc(testIndex, "4", testDoc)
indexDoc(testIndex, "5", testDoc)

found = AtomicBoolean(false)
OpenSearchTestCase.waitUntil(
{
val res = (searchFindings(monitor).size == 4)
found.set(res)
found.get()
}, 2, TimeUnit.MINUTES
)
assertEquals(found.get(), false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class LockService(private val client: Client, private val clusterService: Cluste

companion object {
const val LOCK_INDEX_NAME = ".opensearch-alerting-config-lock"
val LOCK_EXPIRED_SECONDS = TimeValue(5, TimeUnit.MINUTES)
val LOCK_EXPIRED_MINUTES = TimeValue(5, TimeUnit.MINUTES)

@JvmStatic
fun lockMapping(): String? {
Expand Down Expand Up @@ -82,14 +82,14 @@ class LockService(private val client: Client, private val clusterService: Cluste
updateLock(updateLock, listener)
} else {
log.debug("Lock is NOT released. {}", existingLock)
if (existingLock.lockTime.epochSecond + LOCK_EXPIRED_SECONDS.seconds
if (existingLock.lockTime.epochSecond + LOCK_EXPIRED_MINUTES.seconds
< currentTimestamp.epochSecond
) {
log.debug("Lock is expired. Renewing Lock {}", existingLock)
val updateLock = LockModel(existingLock, getNow(), false)
updateLock(updateLock, listener)
} else {
log.debug("Lock is NOT expired. {}", existingLock)
log.debug("Lock is NOT expired. Not running monitor {}", existingLock)
listener.onResponse(null)
}
}
Expand Down Expand Up @@ -233,7 +233,7 @@ class LockService(private val client: Client, private val clusterService: Cluste
listener: ActionListener<Boolean>
) {
if (lock == null) {
log.debug("Lock is null. Nothing to release.")
log.error("Lock is null. Nothing to release.")
listener.onResponse(false)
} else {
log.debug("Releasing lock: {}", lock)
Expand Down

0 comments on commit f84ef1c

Please sign in to comment.