Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed monitor type checks. #1558

Merged
merged 4 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.Monitor.MonitorType
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.core.rest.RestStatus
import org.opensearch.core.xcontent.NamedXContentRegistry
Expand All @@ -48,6 +49,8 @@ import org.opensearch.core.xcontent.XContentParser
import org.opensearch.core.xcontent.XContentParserUtils
import org.opensearch.index.seqno.SequenceNumbers
import org.opensearch.transport.RemoteTransportException
import java.util.Locale
import kotlin.collections.HashMap

private val log = LogManager.getLogger(MonitorMetadataService::class.java)

Expand Down Expand Up @@ -185,10 +188,10 @@ object MonitorMetadataService :

suspend fun recreateRunContext(metadata: MonitorMetadata, monitor: Monitor): MonitorMetadata {
try {
val monitorIndex = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR)
val monitorIndex = if (MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) == MonitorType.DOC_LEVEL_MONITOR)
(monitor.inputs[0] as DocLevelMonitorInput).indices[0]
else null
val runContext = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR)
val runContext = if (MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) == MonitorType.DOC_LEVEL_MONITOR)
createFullRunContext(monitorIndex, metadata.lastRunContext as MutableMap<String, MutableMap<String, Any>>)
else null
return if (runContext != null) {
Expand All @@ -208,12 +211,13 @@ object MonitorMetadataService :
createWithRunContext: Boolean,
workflowMetadataId: String? = null,
): MonitorMetadata {
val monitorIndex = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR)
val monitorIndex = if (MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) == MonitorType.DOC_LEVEL_MONITOR)
(monitor.inputs[0] as DocLevelMonitorInput).indices[0]
else null
val runContext = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR && createWithRunContext)
createFullRunContext(monitorIndex)
else emptyMap()
val runContext =
if (MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) == MonitorType.DOC_LEVEL_MONITOR && createWithRunContext)
createFullRunContext(monitorIndex)
else emptyMap()
return MonitorMetadata(
id = MonitorMetadata.getId(monitor, workflowMetadataId),
seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
val monitor = job as Monitor
val executionId = "${monitor.id}_${LocalDateTime.now(ZoneOffset.UTC)}_${UUID.randomUUID()}"
logger.info(
"Executing scheduled monitor - id: ${monitor.id}, type: ${monitor.monitorType.name}, periodStart: $periodStart, " +
"Executing scheduled monitor - id: ${monitor.id}, type: ${monitor.monitorType}, periodStart: $periodStart, " +
"periodEnd: $periodEnd, dryrun: $dryrun, executionId: $executionId"
)
val runResult = if (monitor.isBucketLevelMonitor()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.QueryLevelTrigger
import org.opensearch.transport.TransportService
import java.time.Instant
import java.util.Locale

object QueryLevelMonitorRunner : MonitorRunner() {
private val logger = LogManager.getLogger(javaClass)
Expand Down Expand Up @@ -68,7 +69,7 @@ object QueryLevelMonitorRunner : MonitorRunner() {
for (trigger in monitor.triggers) {
val currentAlert = currentAlerts[trigger]
val triggerCtx = QueryLevelTriggerExecutionContext(monitor, trigger as QueryLevelTrigger, monitorResult, currentAlert)
val triggerResult = when (monitor.monitorType) {
val triggerResult = when (Monitor.MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT))) {
Monitor.MonitorType.QUERY_LEVEL_MONITOR ->
monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx)
Monitor.MonitorType.CLUSTER_METRICS_MONITOR -> {
Expand All @@ -80,7 +81,7 @@ object QueryLevelMonitorRunner : MonitorRunner() {
else monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx)
}
else ->
throw IllegalArgumentException("Unsupported monitor type: ${monitor.monitorType.name}.")
throw IllegalArgumentException("Unsupported monitor type: ${monitor.monitorType}.")
}

triggerResults[trigger.id] = triggerResult
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.opensearch.rest.RestResponse
import org.opensearch.rest.action.RestResponseListener
import java.io.IOException
import java.time.Instant
import java.util.Locale

private val log = LogManager.getLogger(RestIndexMonitorAction::class.java)

Expand Down Expand Up @@ -98,7 +99,7 @@ class RestIndexMonitorAction : BaseRestHandler() {
validateDataSources(monitor)
val monitorType = monitor.monitorType
val triggers = monitor.triggers
when (monitorType) {
when (Monitor.MonitorType.valueOf(monitorType.uppercase(Locale.ROOT))) {
Monitor.MonitorType.QUERY_LEVEL_MONITOR -> {
triggers.forEach {
if (it !is QueryLevelTrigger) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService
import java.time.Instant
import java.util.Locale

private val log = LogManager.getLogger(TransportExecuteMonitorAction::class.java)
private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO)
Expand Down Expand Up @@ -82,7 +83,7 @@ class TransportExecuteMonitorAction @Inject constructor(
}
try {
log.info(
"Executing monitor from API - id: ${monitor.id}, type: ${monitor.monitorType.name}, " +
"Executing monitor from API - id: ${monitor.id}, type: ${monitor.monitorType}, " +
"periodStart: $periodStart, periodEnd: $periodEnd, dryrun: ${execMonitorRequest.dryrun}"
)
val monitorRunResult = runner.runJob(monitor, periodStart, periodEnd, execMonitorRequest.dryrun, transportService)
Expand Down Expand Up @@ -134,7 +135,7 @@ class TransportExecuteMonitorAction @Inject constructor(
false -> (execMonitorRequest.monitor as Monitor).copy(user = user)
}

if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) {
if (Monitor.MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.DOC_LEVEL_MONITOR) {
try {
scope.launch {
if (!docLevelMonitorQueries.docLevelQueryIndexExists(monitor.dataSources)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ import org.opensearch.commons.alerting.action.IndexMonitorResponse
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
import org.opensearch.commons.alerting.model.DocLevelMonitorInput.Companion.DOC_LEVEL_INPUT_FIELD
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.Monitor.MonitorType
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.commons.alerting.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX
import org.opensearch.commons.alerting.model.SearchInput
Expand All @@ -82,6 +83,7 @@ import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService
import java.io.IOException
import java.time.Duration
import java.util.Locale

private val log = LogManager.getLogger(TransportIndexMonitorAction::class.java)
private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO)
Expand Down Expand Up @@ -525,7 +527,7 @@ class TransportIndexMonitorAction @Inject constructor(
throw t
}
try {
if (request.monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) {
if (MonitorType.valueOf(request.monitor.monitorType.uppercase(Locale.ROOT)) == MonitorType.DOC_LEVEL_MONITOR) {
indexDocLevelMonitorQueries(request.monitor, indexResponse.id, metadata, request.refreshPolicy)
}
// When inserting queries in queryIndex we could update sourceToQueryIndexMapping
Expand Down Expand Up @@ -683,7 +685,7 @@ class TransportIndexMonitorAction @Inject constructor(
val (metadata, created) = MonitorMetadataService.getOrCreateMetadata(request.monitor)
// Recreate runContext if metadata exists
// Delete and insert all queries from/to queryIndex
if (created == false && currentMonitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) {
if (!created && MonitorType.valueOf(currentMonitor.monitorType.uppercase(Locale.ROOT)) == MonitorType.DOC_LEVEL_MONITOR) {
updatedMetadata = MonitorMetadataService.recreateRunContext(metadata, currentMonitor)
client.suspendUntil<Client, BulkByScrollResponse> {
DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ import org.opensearch.rest.RestRequest
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService
import java.util.Locale
import java.util.UUID
import java.util.stream.Collectors

Expand Down Expand Up @@ -400,7 +401,7 @@ class TransportIndexWorkflowAction @Inject constructor(
log.warn("Metadata doc id:${monitorMetadata.id} exists, but it shouldn't!")
}

if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) {
if (Monitor.MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.DOC_LEVEL_MONITOR) {
val oldMonitorMetadata = MonitorMetadataService.getMetadata(monitor)
monitorMetadata = monitorMetadata.copy(sourceToQueryIndexMapping = oldMonitorMetadata!!.sourceToQueryIndexMapping)
}
Expand Down Expand Up @@ -554,7 +555,10 @@ class TransportIndexWorkflowAction @Inject constructor(
workflowMetadataId = workflowMetadata.id
)

if (created == false && monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) {
if (
!created &&
Monitor.MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.DOC_LEVEL_MONITOR
) {
var updatedMetadata = MonitorMetadataService.recreateRunContext(monitorMetadata, monitor)
val oldMonitorMetadata = MonitorMetadataService.getMetadata(monitor)
updatedMetadata = updatedMetadata.copy(sourceToQueryIndexMapping = oldMonitorMetadata!!.sourceToQueryIndexMapping)
Expand Down Expand Up @@ -632,7 +636,7 @@ class TransportIndexWorkflowAction @Inject constructor(
* Returns list of indices for the given monitor depending on it's type
*/
private fun getMonitorIndices(monitor: Monitor): List<String> {
return when (monitor.monitorType) {
return when (Monitor.MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT))) {
Monitor.MonitorType.DOC_LEVEL_MONITOR -> (monitor.inputs[0] as DocLevelMonitorInput).indices
Monitor.MonitorType.BUCKET_LEVEL_MONITOR -> monitor.inputs.flatMap { s -> (s as SearchInput).indices }
Monitor.MonitorType.QUERY_LEVEL_MONITOR -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.opensearch.commons.alerting.model.action.ActionExecutionPolicy
import org.opensearch.commons.alerting.model.action.ActionExecutionScope
import org.opensearch.commons.alerting.util.isBucketLevelMonitor
import org.opensearch.script.Script
import java.util.Locale
import kotlin.math.max

private val logger = LogManager.getLogger("AlertingUtils")
Expand Down Expand Up @@ -78,9 +79,11 @@ fun Destination.isAllowed(allowList: List<String>): Boolean = allowList.contains

fun Destination.isTestAction(): Boolean = this.type == DestinationType.TEST_ACTION

fun Monitor.isDocLevelMonitor(): Boolean = this.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR
fun Monitor.isDocLevelMonitor(): Boolean =
Monitor.MonitorType.valueOf(this.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.DOC_LEVEL_MONITOR

fun Monitor.isQueryLevelMonitor(): Boolean = this.monitorType == Monitor.MonitorType.QUERY_LEVEL_MONITOR
fun Monitor.isQueryLevelMonitor(): Boolean =
Monitor.MonitorType.valueOf(this.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.QUERY_LEVEL_MONITOR

/**
* Since buckets can have multi-value keys, this converts the bucket key values to a string that can be used
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ fun randomADMonitor(
withMetadata: Boolean = false
): Monitor {
return Monitor(
name = name, monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR, enabled = enabled, inputs = inputs,
name = name, monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR.value, enabled = enabled, inputs = inputs,
schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime,
user = user, uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf()
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ fun randomQueryLevelMonitor(
withMetadata: Boolean = false
): Monitor {
return Monitor(
name = name, monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR, enabled = enabled, inputs = inputs,
name = name, monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR.value, enabled = enabled, inputs = inputs,
schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user,
uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf()
)
Expand All @@ -113,7 +113,7 @@ fun randomQueryLevelMonitorWithoutUser(
withMetadata: Boolean = false
): Monitor {
return Monitor(
name = name, monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR, enabled = enabled, inputs = inputs,
name = name, monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR.value, enabled = enabled, inputs = inputs,
schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = null,
uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf()
)
Expand All @@ -137,7 +137,7 @@ fun randomBucketLevelMonitor(
withMetadata: Boolean = false
): Monitor {
return Monitor(
name = name, monitorType = Monitor.MonitorType.BUCKET_LEVEL_MONITOR, enabled = enabled, inputs = inputs,
name = name, monitorType = Monitor.MonitorType.BUCKET_LEVEL_MONITOR.value, enabled = enabled, inputs = inputs,
schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user,
uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf()
)
Expand All @@ -162,7 +162,7 @@ fun randomBucketLevelMonitor(
dataSources: DataSources
): Monitor {
return Monitor(
name = name, monitorType = Monitor.MonitorType.BUCKET_LEVEL_MONITOR, enabled = enabled, inputs = inputs,
name = name, monitorType = Monitor.MonitorType.BUCKET_LEVEL_MONITOR.value, enabled = enabled, inputs = inputs,
schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user,
uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf(),
dataSources = dataSources
Expand All @@ -181,7 +181,7 @@ fun randomClusterMetricsMonitor(
withMetadata: Boolean = false
): Monitor {
return Monitor(
name = name, monitorType = Monitor.MonitorType.CLUSTER_METRICS_MONITOR, enabled = enabled, inputs = inputs,
name = name, monitorType = Monitor.MonitorType.CLUSTER_METRICS_MONITOR.value, enabled = enabled, inputs = inputs,
schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user,
uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf()
)
Expand All @@ -199,7 +199,7 @@ fun randomDocumentLevelMonitor(
withMetadata: Boolean = false
): Monitor {
return Monitor(
name = name, monitorType = Monitor.MonitorType.DOC_LEVEL_MONITOR, enabled = enabled, inputs = inputs,
name = name, monitorType = Monitor.MonitorType.DOC_LEVEL_MONITOR.value, enabled = enabled, inputs = inputs,
schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user,
uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf()
)
Expand All @@ -219,7 +219,7 @@ fun randomDocumentLevelMonitor(
owner: String? = null
): Monitor {
return Monitor(
name = name, monitorType = Monitor.MonitorType.DOC_LEVEL_MONITOR, enabled = enabled, inputs = inputs,
name = name, monitorType = Monitor.MonitorType.DOC_LEVEL_MONITOR.value, enabled = enabled, inputs = inputs,
schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user,
uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf(), dataSources = dataSources, owner = owner
)
Expand Down
Loading