Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

initial commit for remote monitor support #1547

Merged
merged 13 commits into from
May 31, 2024
2 changes: 2 additions & 0 deletions alerting/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

import com.github.jengelman.gradle.plugins.shadow.ShadowPlugin
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this the same dependency used by jobscheduler or notification for shadow plugin?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes.

import java.util.concurrent.Callable
import org.opensearch.gradle.test.RestIntegTestTask
import org.opensearch.gradle.testclusters.StandaloneRestIntegTestTask
Expand Down Expand Up @@ -114,6 +115,7 @@ dependencies {

api project(":alerting-core")
implementation "com.github.seancfoley:ipaddress:5.4.1"
implementation project(path: ":alerting-spi", configuration: 'shadow')

testImplementation "org.antlr:antlr4-runtime:${versions.antlr4}"
testImplementation "org.jetbrains.kotlin:kotlin-test:${kotlin_version}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.opensearch.alerting.core.resthandler.RestScheduledJobStatsHandler
import org.opensearch.alerting.core.schedule.JobScheduler
import org.opensearch.alerting.core.settings.LegacyOpenDistroScheduledJobSettings
import org.opensearch.alerting.core.settings.ScheduledJobSettings
import org.opensearch.alerting.remote.monitors.RemoteMonitorRegistry
import org.opensearch.alerting.resthandler.RestAcknowledgeAlertAction
import org.opensearch.alerting.resthandler.RestAcknowledgeChainedAlertAction
import org.opensearch.alerting.resthandler.RestDeleteMonitorAction
Expand Down Expand Up @@ -52,6 +53,7 @@ import org.opensearch.alerting.settings.AlertingSettings.Companion.DOC_LEVEL_MON
import org.opensearch.alerting.settings.DestinationSettings
import org.opensearch.alerting.settings.LegacyOpenDistroAlertingSettings
import org.opensearch.alerting.settings.LegacyOpenDistroDestinationSettings
import org.opensearch.alerting.spi.RemoteMonitorRunnerExtension
import org.opensearch.alerting.transport.TransportAcknowledgeAlertAction
import org.opensearch.alerting.transport.TransportAcknowledgeChainedAlertAction
import org.opensearch.alerting.transport.TransportDeleteMonitorAction
Expand Down Expand Up @@ -110,6 +112,7 @@ import org.opensearch.painless.spi.AllowlistLoader
import org.opensearch.painless.spi.PainlessExtension
import org.opensearch.percolator.PercolatorPluginExt
import org.opensearch.plugins.ActionPlugin
import org.opensearch.plugins.ExtensiblePlugin
import org.opensearch.plugins.ReloadablePlugin
import org.opensearch.plugins.ScriptPlugin
import org.opensearch.plugins.SearchPlugin
Expand Down Expand Up @@ -162,6 +165,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
lateinit var alertIndices: AlertIndices
lateinit var clusterService: ClusterService
lateinit var destinationMigrationCoordinator: DestinationMigrationCoordinator
var monitorTypeToMonitorRunners: MutableMap<String, RemoteMonitorRegistry> = mutableMapOf()

override fun getRestHandlers(
settings: Settings,
Expand Down Expand Up @@ -277,6 +281,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
.registerLockService(lockService)
.registerConsumers()
.registerDestinationSettings()
.registerRemoteMonitors(monitorTypeToMonitorRunners)
scheduledJobIndices = ScheduledJobIndices(client.admin(), clusterService)
docLevelMonitorQueries = DocLevelMonitorQueries(client, clusterService)
scheduler = JobScheduler(threadPool, runner)
Expand Down Expand Up @@ -409,4 +414,16 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
)
)
}

override fun loadExtensions(loader: ExtensiblePlugin.ExtensionLoader) {
for (monitorExtension in loader.loadExtensions(RemoteMonitorRunnerExtension::class.java)) {
val monitorType = monitorExtension.getMonitorType()
val monitorRunner = monitorExtension.getMonitorRunner()

if (!this.monitorTypeToMonitorRunners.containsKey(monitorType)) {
val monitorRegistry = RemoteMonitorRegistry(monitorType, monitorRunner)
this.monitorTypeToMonitorRunners[monitorType] = monitorRegistry
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@ import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.commons.alerting.util.isMonitorOfStandardType
import org.opensearch.core.rest.RestStatus
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.core.xcontent.XContentParserUtils
import org.opensearch.index.seqno.SequenceNumbers
import org.opensearch.transport.RemoteTransportException
import java.util.*

private val log = LogManager.getLogger(MonitorMetadataService::class.java)

Expand Down Expand Up @@ -185,10 +187,16 @@ object MonitorMetadataService :

suspend fun recreateRunContext(metadata: MonitorMetadata, monitor: Monitor): MonitorMetadata {
try {
val monitorIndex = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR)
val monitorIndex = if (
monitor.isMonitorOfStandardType() &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just check string value is doc level monitor?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just want to use a single string constant, so just reusing the enum value.

Monitor.MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.DOC_LEVEL_MONITOR
)
(monitor.inputs[0] as DocLevelMonitorInput).indices[0]
else null
val runContext = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR)
val runContext = if (
monitor.isMonitorOfStandardType() &&
Monitor.MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.DOC_LEVEL_MONITOR
)
createFullRunContext(monitorIndex, metadata.lastRunContext as MutableMap<String, MutableMap<String, Any>>)
else null
return if (runContext != null) {
Expand All @@ -208,10 +216,16 @@ object MonitorMetadataService :
createWithRunContext: Boolean,
workflowMetadataId: String? = null,
): MonitorMetadata {
val monitorIndex = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR)
val monitorIndex = if (
monitor.isMonitorOfStandardType() &&
Monitor.MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.DOC_LEVEL_MONITOR
)
(monitor.inputs[0] as DocLevelMonitorInput).indices[0]
else null
val runContext = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR && createWithRunContext)
val runContext = if (
monitor.isMonitorOfStandardType() &&
Copy link
Member

@eirsep eirsep May 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extract into method - isDocLevelMonitor()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Monitor.MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.DOC_LEVEL_MONITOR && createWithRunContext
)
createFullRunContext(monitorIndex)
else emptyMap()
return MonitorMetadata(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import org.opensearch.action.bulk.BackoffPolicy
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.core.lock.LockService
import org.opensearch.alerting.model.destination.DestinationContextFactory
import org.opensearch.alerting.remote.monitors.RemoteMonitorRegistry
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.settings.DestinationSettings
import org.opensearch.alerting.settings.LegacyOpenDistroDestinationSettings
Expand Down Expand Up @@ -41,6 +42,7 @@ data class MonitorRunnerExecutionContext(
var workflowService: WorkflowService? = null,
var jvmStats: JvmStats? = null,
var findingsToTriggeredQueries: Map<String, List<DocLevelQuery>>? = null,
var remoteMonitors: Map<String, RemoteMonitorRegistry> = mapOf(),

@Volatile var retryPolicy: BackoffPolicy? = null,
@Volatile var moveAlertsRetryPolicy: BackoffPolicy? = null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.bulk.BackoffPolicy
import org.opensearch.action.search.TransportSearchAction.SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING
import org.opensearch.action.support.master.AcknowledgedResponse
Expand All @@ -26,11 +27,15 @@ import org.opensearch.alerting.core.JobRunner
import org.opensearch.alerting.core.ScheduledJobIndices
import org.opensearch.alerting.core.lock.LockModel
import org.opensearch.alerting.core.lock.LockService
import org.opensearch.alerting.model.ActionRunResult
import org.opensearch.alerting.model.InputRunResults
import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.model.RemoteMonitorTriggerRunResult
import org.opensearch.alerting.model.WorkflowRunResult
import org.opensearch.alerting.model.destination.DestinationContextFactory
import org.opensearch.alerting.opensearchapi.retry
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.remote.monitors.RemoteMonitorRegistry
import org.opensearch.alerting.script.TriggerExecutionContext
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_COUNT
Expand Down Expand Up @@ -63,7 +68,9 @@ import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.commons.alerting.model.Workflow
import org.opensearch.commons.alerting.model.action.Action
import org.opensearch.commons.alerting.util.isBucketLevelMonitor
import org.opensearch.commons.alerting.util.isMonitorOfStandardType
import org.opensearch.core.action.ActionListener
import org.opensearch.core.rest.RestStatus
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.monitor.jvm.JvmStats
import org.opensearch.script.Script
Expand Down Expand Up @@ -156,6 +163,11 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
return this
}

fun registerRemoteMonitors(monitorRegistry: Map<String, RemoteMonitorRegistry>): MonitorRunnerService {
this.monitorCtx.remoteMonitors = monitorRegistry
return this
}

// Must be called after registerClusterService and registerSettings in AlertingPlugin
fun registerConsumers(): MonitorRunnerService {
monitorCtx.retryPolicy = BackoffPolicy.constantBackoff(
Expand Down Expand Up @@ -423,43 +435,88 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
CompositeWorkflowRunner.runWorkflow(workflow = job, monitorCtx, periodStart, periodEnd, dryrun, transportService)
}
val monitor = job as Monitor
val executionId = "${monitor.id}_${LocalDateTime.now(ZoneOffset.UTC)}_${UUID.randomUUID()}"
logger.info(
"Executing scheduled monitor - id: ${monitor.id}, type: ${monitor.monitorType.name}, periodStart: $periodStart, " +
"periodEnd: $periodEnd, dryrun: $dryrun, executionId: $executionId"
)
val runResult = if (monitor.isBucketLevelMonitor()) {
BucketLevelMonitorRunner.runMonitor(
monitor,
monitorCtx,
periodStart,
periodEnd,
dryrun,
executionId = executionId,
transportService = transportService
)
} else if (monitor.isDocLevelMonitor()) {
DocumentLevelMonitorRunner().runMonitor(
monitor,
monitorCtx,
periodStart,
periodEnd,
dryrun,
executionId = executionId,
transportService = transportService

if (monitor.isMonitorOfStandardType()) {
val executionId = "${monitor.id}_${LocalDateTime.now(ZoneOffset.UTC)}_${UUID.randomUUID()}"
logger.info(
"Executing scheduled monitor - id: ${monitor.id}, type: ${monitor.monitorType}, periodStart: $periodStart, " +
"periodEnd: $periodEnd, dryrun: $dryrun, executionId: $executionId"
)
val runResult = if (monitor.isBucketLevelMonitor()) {
BucketLevelMonitorRunner.runMonitor(
monitor,
monitorCtx,
periodStart,
periodEnd,
dryrun,
executionId = executionId,
transportService = transportService
)
} else if (monitor.isDocLevelMonitor()) {
DocumentLevelMonitorRunner().runMonitor(
monitor,
monitorCtx,
periodStart,
periodEnd,
dryrun,
executionId = executionId,
transportService = transportService
)
} else {
QueryLevelMonitorRunner.runMonitor(
monitor,
monitorCtx,
periodStart,
periodEnd,
dryrun,
executionId = executionId,
transportService = transportService
)
}
return runResult
} else {
QueryLevelMonitorRunner.runMonitor(
monitor,
monitorCtx,
periodStart,
periodEnd,
dryrun,
executionId = executionId,
transportService = transportService
)
if (monitorCtx.remoteMonitors.containsKey(monitor.monitorType)) {
val remoteRunResult = monitorCtx.remoteMonitors[monitor.monitorType]!!.monitorRunner.runMonitor(
monitor,
periodStart,
periodEnd,
dryrun,
transportService
)
return MonitorRunResult(
monitor.name,
periodStart,
periodEnd,
remoteRunResult.error,
InputRunResults(remoteRunResult.results, remoteRunResult.error),
remoteRunResult.triggerResults.map { triggerResult ->
triggerResult.key to RemoteMonitorTriggerRunResult(
triggerResult.value.triggerName,
triggerResult.value.error,
triggerResult.value.actionResultsMap.map { actionResult ->
actionResult.key to actionResult.value.map {
it.key to ActionRunResult(
it.value.actionId,
it.value.actionName,
it.value.output,
it.value.throttled,
it.value.executionTime,
it.value.error
)
}.associate { it.first to it.second }.toMutableMap()
}.associate { it.first to it.second }.toMutableMap()
)
}.associate { it.first to it.second }
)
} else {
return MonitorRunResult<RemoteMonitorTriggerRunResult>(
monitor.name,
periodStart,
periodEnd,
OpenSearchStatusException("Monitor Type ${monitor.monitorType} not known", RestStatus.BAD_REQUEST)
)
}
}
return runResult
}

// TODO: See if we can move below methods (or few of these) to a common utils
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.QueryLevelTrigger
import org.opensearch.transport.TransportService
import java.time.Instant
import java.util.*

object QueryLevelMonitorRunner : MonitorRunner() {
private val logger = LogManager.getLogger(javaClass)
Expand Down Expand Up @@ -68,7 +69,7 @@ object QueryLevelMonitorRunner : MonitorRunner() {
for (trigger in monitor.triggers) {
val currentAlert = currentAlerts[trigger]
val triggerCtx = QueryLevelTriggerExecutionContext(monitor, trigger as QueryLevelTrigger, monitorResult, currentAlert)
val triggerResult = when (monitor.monitorType) {
val triggerResult = when (Monitor.MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT))) {
Monitor.MonitorType.QUERY_LEVEL_MONITOR ->
monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx)
Monitor.MonitorType.CLUSTER_METRICS_MONITOR -> {
Expand All @@ -80,7 +81,7 @@ object QueryLevelMonitorRunner : MonitorRunner() {
else monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx)
}
else ->
throw IllegalArgumentException("Unsupported monitor type: ${monitor.monitorType.name}.")
throw IllegalArgumentException("Unsupported monitor type: ${monitor.monitorType}.")
}

triggerResults[trigger.id] = triggerResult
Expand Down
Loading
Loading