Skip to content

Commit

Permalink
initial commit for remote monitor support
Browse files Browse the repository at this point in the history
Signed-off-by: Subhobrata Dey <[email protected]>
  • Loading branch information
sbcd90 committed May 15, 2024
1 parent 66359f8 commit 197b7f8
Show file tree
Hide file tree
Showing 27 changed files with 874 additions and 91 deletions.
2 changes: 2 additions & 0 deletions alerting/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

import com.github.jengelman.gradle.plugins.shadow.ShadowPlugin
import java.util.concurrent.Callable
import org.opensearch.gradle.test.RestIntegTestTask
import org.opensearch.gradle.testclusters.StandaloneRestIntegTestTask
Expand Down Expand Up @@ -114,6 +115,7 @@ dependencies {

api project(":alerting-core")
implementation "com.github.seancfoley:ipaddress:5.4.1"
implementation project(path: ":alerting-spi", configuration: 'shadow')

testImplementation "org.antlr:antlr4-runtime:${versions.antlr4}"
testImplementation "org.jetbrains.kotlin:kotlin-test:${kotlin_version}"
Expand Down
17 changes: 17 additions & 0 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.opensearch.alerting.core.resthandler.RestScheduledJobStatsHandler
import org.opensearch.alerting.core.schedule.JobScheduler
import org.opensearch.alerting.core.settings.LegacyOpenDistroScheduledJobSettings
import org.opensearch.alerting.core.settings.ScheduledJobSettings
import org.opensearch.alerting.remote.monitors.RemoteMonitorRegistry
import org.opensearch.alerting.resthandler.RestAcknowledgeAlertAction
import org.opensearch.alerting.resthandler.RestAcknowledgeChainedAlertAction
import org.opensearch.alerting.resthandler.RestDeleteMonitorAction
Expand Down Expand Up @@ -52,6 +53,7 @@ import org.opensearch.alerting.settings.AlertingSettings.Companion.DOC_LEVEL_MON
import org.opensearch.alerting.settings.DestinationSettings
import org.opensearch.alerting.settings.LegacyOpenDistroAlertingSettings
import org.opensearch.alerting.settings.LegacyOpenDistroDestinationSettings
import org.opensearch.alerting.spi.RemoteMonitorRunnerExtension
import org.opensearch.alerting.transport.TransportAcknowledgeAlertAction
import org.opensearch.alerting.transport.TransportAcknowledgeChainedAlertAction
import org.opensearch.alerting.transport.TransportDeleteMonitorAction
Expand Down Expand Up @@ -110,6 +112,7 @@ import org.opensearch.painless.spi.AllowlistLoader
import org.opensearch.painless.spi.PainlessExtension
import org.opensearch.percolator.PercolatorPluginExt
import org.opensearch.plugins.ActionPlugin
import org.opensearch.plugins.ExtensiblePlugin
import org.opensearch.plugins.ReloadablePlugin
import org.opensearch.plugins.ScriptPlugin
import org.opensearch.plugins.SearchPlugin
Expand Down Expand Up @@ -162,6 +165,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
lateinit var alertIndices: AlertIndices
lateinit var clusterService: ClusterService
lateinit var destinationMigrationCoordinator: DestinationMigrationCoordinator
var monitorTypeToMonitorRunners: MutableMap<String, RemoteMonitorRegistry> = mutableMapOf()

override fun getRestHandlers(
settings: Settings,
Expand Down Expand Up @@ -277,6 +281,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
.registerLockService(lockService)
.registerConsumers()
.registerDestinationSettings()
.registerRemoteMonitors(monitorTypeToMonitorRunners)
scheduledJobIndices = ScheduledJobIndices(client.admin(), clusterService)
docLevelMonitorQueries = DocLevelMonitorQueries(client, clusterService)
scheduler = JobScheduler(threadPool, runner)
Expand Down Expand Up @@ -409,4 +414,16 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
)
)
}

override fun loadExtensions(loader: ExtensiblePlugin.ExtensionLoader) {
for (monitorExtension in loader.loadExtensions(RemoteMonitorRunnerExtension::class.java)) {
val monitorType = monitorExtension.getMonitorType()
val monitorRunner = monitorExtension.getMonitorRunner()

if (!this.monitorTypeToMonitorRunners.containsKey(monitorType)) {
val monitorRegistry = RemoteMonitorRegistry(monitorType, monitorRunner)
this.monitorTypeToMonitorRunners[monitorType] = monitorRegistry
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@ import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.commons.alerting.util.isMonitorOfStandardType
import org.opensearch.core.rest.RestStatus
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.core.xcontent.XContentParserUtils
import org.opensearch.index.seqno.SequenceNumbers
import org.opensearch.transport.RemoteTransportException
import java.util.*

private val log = LogManager.getLogger(MonitorMetadataService::class.java)

Expand Down Expand Up @@ -185,10 +187,16 @@ object MonitorMetadataService :

suspend fun recreateRunContext(metadata: MonitorMetadata, monitor: Monitor): MonitorMetadata {
try {
val monitorIndex = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR)
val monitorIndex = if (
monitor.isMonitorOfStandardType() &&
Monitor.MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.DOC_LEVEL_MONITOR
)
(monitor.inputs[0] as DocLevelMonitorInput).indices[0]
else null
val runContext = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR)
val runContext = if (
monitor.isMonitorOfStandardType() &&
Monitor.MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.DOC_LEVEL_MONITOR
)
createFullRunContext(monitorIndex, metadata.lastRunContext as MutableMap<String, MutableMap<String, Any>>)
else null
return if (runContext != null) {
Expand All @@ -208,10 +216,16 @@ object MonitorMetadataService :
createWithRunContext: Boolean,
workflowMetadataId: String? = null,
): MonitorMetadata {
val monitorIndex = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR)
val monitorIndex = if (
monitor.isMonitorOfStandardType() &&
Monitor.MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.DOC_LEVEL_MONITOR
)
(monitor.inputs[0] as DocLevelMonitorInput).indices[0]
else null
val runContext = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR && createWithRunContext)
val runContext = if (
monitor.isMonitorOfStandardType() &&
Monitor.MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.DOC_LEVEL_MONITOR && createWithRunContext
)
createFullRunContext(monitorIndex)
else emptyMap()
return MonitorMetadata(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import org.opensearch.action.bulk.BackoffPolicy
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.core.lock.LockService
import org.opensearch.alerting.model.destination.DestinationContextFactory
import org.opensearch.alerting.remote.monitors.RemoteMonitorRegistry
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.settings.DestinationSettings
import org.opensearch.alerting.settings.LegacyOpenDistroDestinationSettings
Expand Down Expand Up @@ -41,6 +42,7 @@ data class MonitorRunnerExecutionContext(
var workflowService: WorkflowService? = null,
var jvmStats: JvmStats? = null,
var findingsToTriggeredQueries: Map<String, List<DocLevelQuery>>? = null,
var remoteMonitors: Map<String, RemoteMonitorRegistry> = mapOf(),

@Volatile var retryPolicy: BackoffPolicy? = null,
@Volatile var moveAlertsRetryPolicy: BackoffPolicy? = null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.bulk.BackoffPolicy
import org.opensearch.action.search.TransportSearchAction.SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING
import org.opensearch.action.support.master.AcknowledgedResponse
Expand All @@ -26,11 +27,15 @@ import org.opensearch.alerting.core.JobRunner
import org.opensearch.alerting.core.ScheduledJobIndices
import org.opensearch.alerting.core.lock.LockModel
import org.opensearch.alerting.core.lock.LockService
import org.opensearch.alerting.model.ActionRunResult
import org.opensearch.alerting.model.InputRunResults
import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.model.RemoteMonitorTriggerRunResult
import org.opensearch.alerting.model.WorkflowRunResult
import org.opensearch.alerting.model.destination.DestinationContextFactory
import org.opensearch.alerting.opensearchapi.retry
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.remote.monitors.RemoteMonitorRegistry
import org.opensearch.alerting.script.TriggerExecutionContext
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_COUNT
Expand Down Expand Up @@ -63,7 +68,9 @@ import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.commons.alerting.model.Workflow
import org.opensearch.commons.alerting.model.action.Action
import org.opensearch.commons.alerting.util.isBucketLevelMonitor
import org.opensearch.commons.alerting.util.isMonitorOfStandardType
import org.opensearch.core.action.ActionListener
import org.opensearch.core.rest.RestStatus
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.monitor.jvm.JvmStats
import org.opensearch.script.Script
Expand Down Expand Up @@ -156,6 +163,11 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
return this
}

fun registerRemoteMonitors(monitorRegistry: Map<String, RemoteMonitorRegistry>): MonitorRunnerService {
this.monitorCtx.remoteMonitors = monitorRegistry
return this
}

// Must be called after registerClusterService and registerSettings in AlertingPlugin
fun registerConsumers(): MonitorRunnerService {
monitorCtx.retryPolicy = BackoffPolicy.constantBackoff(
Expand Down Expand Up @@ -423,43 +435,88 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
CompositeWorkflowRunner.runWorkflow(workflow = job, monitorCtx, periodStart, periodEnd, dryrun, transportService)
}
val monitor = job as Monitor
val executionId = "${monitor.id}_${LocalDateTime.now(ZoneOffset.UTC)}_${UUID.randomUUID()}"
logger.info(
"Executing scheduled monitor - id: ${monitor.id}, type: ${monitor.monitorType.name}, periodStart: $periodStart, " +
"periodEnd: $periodEnd, dryrun: $dryrun, executionId: $executionId"
)
val runResult = if (monitor.isBucketLevelMonitor()) {
BucketLevelMonitorRunner.runMonitor(
monitor,
monitorCtx,
periodStart,
periodEnd,
dryrun,
executionId = executionId,
transportService = transportService
)
} else if (monitor.isDocLevelMonitor()) {
DocumentLevelMonitorRunner().runMonitor(
monitor,
monitorCtx,
periodStart,
periodEnd,
dryrun,
executionId = executionId,
transportService = transportService

if (monitor.isMonitorOfStandardType()) {
val executionId = "${monitor.id}_${LocalDateTime.now(ZoneOffset.UTC)}_${UUID.randomUUID()}"
logger.info(
"Executing scheduled monitor - id: ${monitor.id}, type: ${monitor.monitorType}, periodStart: $periodStart, " +
"periodEnd: $periodEnd, dryrun: $dryrun, executionId: $executionId"
)
val runResult = if (monitor.isBucketLevelMonitor()) {
BucketLevelMonitorRunner.runMonitor(
monitor,
monitorCtx,
periodStart,
periodEnd,
dryrun,
executionId = executionId,
transportService = transportService
)
} else if (monitor.isDocLevelMonitor()) {
DocumentLevelMonitorRunner().runMonitor(
monitor,
monitorCtx,
periodStart,
periodEnd,
dryrun,
executionId = executionId,
transportService = transportService
)
} else {
QueryLevelMonitorRunner.runMonitor(
monitor,
monitorCtx,
periodStart,
periodEnd,
dryrun,
executionId = executionId,
transportService = transportService
)
}
return runResult
} else {
QueryLevelMonitorRunner.runMonitor(
monitor,
monitorCtx,
periodStart,
periodEnd,
dryrun,
executionId = executionId,
transportService = transportService
)
if (monitorCtx.remoteMonitors.containsKey(monitor.monitorType)) {
val remoteRunResult = monitorCtx.remoteMonitors[monitor.monitorType]!!.monitorRunner.runMonitor(
monitor,
periodStart,
periodEnd,
dryrun,
transportService
)
return MonitorRunResult(
monitor.name,
periodStart,
periodEnd,
remoteRunResult.error,
InputRunResults(remoteRunResult.results, remoteRunResult.error),
remoteRunResult.triggerResults.map { triggerResult ->
triggerResult.key to RemoteMonitorTriggerRunResult(
triggerResult.value.triggerName,
triggerResult.value.error,
triggerResult.value.actionResultsMap.map { actionResult ->
actionResult.key to actionResult.value.map {
it.key to ActionRunResult(
it.value.actionId,
it.value.actionName,
it.value.output,
it.value.throttled,
it.value.executionTime,
it.value.error
)
}.associate { it.first to it.second }.toMutableMap()
}.associate { it.first to it.second }.toMutableMap()
)
}.associate { it.first to it.second }
)
} else {
return MonitorRunResult<RemoteMonitorTriggerRunResult>(
monitor.name,
periodStart,
periodEnd,
OpenSearchStatusException("Monitor Type ${monitor.monitorType} not known", RestStatus.BAD_REQUEST)
)
}
}
return runResult
}

// TODO: See if we can move below methods (or few of these) to a common utils
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.QueryLevelTrigger
import org.opensearch.transport.TransportService
import java.time.Instant
import java.util.*

object QueryLevelMonitorRunner : MonitorRunner() {
private val logger = LogManager.getLogger(javaClass)
Expand Down Expand Up @@ -68,7 +69,7 @@ object QueryLevelMonitorRunner : MonitorRunner() {
for (trigger in monitor.triggers) {
val currentAlert = currentAlerts[trigger]
val triggerCtx = QueryLevelTriggerExecutionContext(monitor, trigger as QueryLevelTrigger, monitorResult, currentAlert)
val triggerResult = when (monitor.monitorType) {
val triggerResult = when (Monitor.MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT))) {
Monitor.MonitorType.QUERY_LEVEL_MONITOR ->
monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx)
Monitor.MonitorType.CLUSTER_METRICS_MONITOR -> {
Expand All @@ -80,7 +81,7 @@ object QueryLevelMonitorRunner : MonitorRunner() {
else monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx)
}
else ->
throw IllegalArgumentException("Unsupported monitor type: ${monitor.monitorType.name}.")
throw IllegalArgumentException("Unsupported monitor type: ${monitor.monitorType}.")
}

triggerResults[trigger.id] = triggerResult
Expand Down
Loading

0 comments on commit 197b7f8

Please sign in to comment.