Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.15] commits to support remote monitors in alerting #1593

Merged
merged 1 commit into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/maven-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,5 @@ jobs:
export SONATYPE_PASSWORD=$(aws secretsmanager get-secret-value --secret-id maven-snapshots-password --query SecretString --output text)
echo "::add-mask::$SONATYPE_USERNAME"
echo "::add-mask::$SONATYPE_PASSWORD"
./gradlew publishShadowPublicationToSnapshotsRepository
./gradlew publishPluginZipPublicationToSnapshotsRepository
2 changes: 1 addition & 1 deletion .github/workflows/test-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ jobs:
java-version: ${{ matrix.java }}
- name: Build and run with Gradle
working-directory: ${{ env.WORKING_DIR }}
run: ./gradlew assemble integTest ${{ env.BUILD_ARGS }}
run: ./gradlew assemble ${{ env.BUILD_ARGS }}
env:
_JAVA_OPTIONS: ${{ matrix.os_java_options }}
- name: Create Artifact Path
Expand Down
2 changes: 2 additions & 0 deletions alerting/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

import com.github.jengelman.gradle.plugins.shadow.ShadowPlugin
import java.util.concurrent.Callable
import org.opensearch.gradle.test.RestIntegTestTask
import org.opensearch.gradle.testclusters.StandaloneRestIntegTestTask
Expand Down Expand Up @@ -109,6 +110,7 @@ dependencies {

api project(":alerting-core")
implementation "com.github.seancfoley:ipaddress:5.4.1"
implementation project(path: ":alerting-spi", configuration: 'shadow')

testImplementation "org.antlr:antlr4-runtime:${versions.antlr4}"
testImplementation "org.jetbrains.kotlin:kotlin-test:${kotlin_version}"
Expand Down
10 changes: 5 additions & 5 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.model.ActionRunResult
import org.opensearch.alerting.model.ChainedAlertTriggerRunResult
import org.opensearch.alerting.model.ClusterMetricsTriggerRunResult
import org.opensearch.alerting.model.QueryLevelTriggerRunResult
import org.opensearch.alerting.opensearchapi.firstFailureOrNull
import org.opensearch.alerting.opensearchapi.retry
import org.opensearch.alerting.opensearchapi.suspendUntil
Expand All @@ -32,7 +28,6 @@ import org.opensearch.alerting.util.CommentsUtils
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.MAX_SEARCH_SIZE
import org.opensearch.alerting.util.getBucketKeysHash
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.client.Client
import org.opensearch.common.unit.TimeValue
import org.opensearch.common.xcontent.LoggingDeprecationHandler
Expand All @@ -41,14 +36,19 @@ import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.alerts.AlertError
import org.opensearch.commons.alerting.model.ActionExecutionResult
import org.opensearch.commons.alerting.model.ActionRunResult
import org.opensearch.commons.alerting.model.AggregationResultBucket
import org.opensearch.commons.alerting.model.Alert
import org.opensearch.commons.alerting.model.BucketLevelTrigger
import org.opensearch.commons.alerting.model.ChainedAlertTriggerRunResult
import org.opensearch.commons.alerting.model.ClusterMetricsTriggerRunResult
import org.opensearch.commons.alerting.model.DataSources
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.NoOpTrigger
import org.opensearch.commons.alerting.model.QueryLevelTriggerRunResult
import org.opensearch.commons.alerting.model.Trigger
import org.opensearch.commons.alerting.model.Workflow
import org.opensearch.commons.alerting.model.WorkflowRunContext
import org.opensearch.commons.alerting.model.action.AlertCategory
import org.opensearch.core.action.ActionListener
import org.opensearch.core.common.bytes.BytesReference
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package org.opensearch.alerting

import org.opensearch.action.ActionRequest
import org.opensearch.alerting.action.DocLevelMonitorFanOutAction
import org.opensearch.alerting.action.ExecuteMonitorAction
import org.opensearch.alerting.action.ExecuteWorkflowAction
import org.opensearch.alerting.action.GetDestinationsAction
Expand All @@ -26,6 +25,7 @@ import org.opensearch.alerting.core.resthandler.RestScheduledJobStatsHandler
import org.opensearch.alerting.core.schedule.JobScheduler
import org.opensearch.alerting.core.settings.LegacyOpenDistroScheduledJobSettings
import org.opensearch.alerting.core.settings.ScheduledJobSettings
import org.opensearch.alerting.remote.monitors.RemoteMonitorRegistry
import org.opensearch.alerting.resthandler.RestAcknowledgeAlertAction
import org.opensearch.alerting.resthandler.RestAcknowledgeChainedAlertAction
import org.opensearch.alerting.resthandler.RestDeleteAlertingCommentAction
Expand Down Expand Up @@ -56,6 +56,7 @@ import org.opensearch.alerting.settings.AlertingSettings.Companion.DOC_LEVEL_MON
import org.opensearch.alerting.settings.DestinationSettings
import org.opensearch.alerting.settings.LegacyOpenDistroAlertingSettings
import org.opensearch.alerting.settings.LegacyOpenDistroDestinationSettings
import org.opensearch.alerting.spi.RemoteMonitorRunnerExtension
import org.opensearch.alerting.transport.TransportAcknowledgeAlertAction
import org.opensearch.alerting.transport.TransportAcknowledgeChainedAlertAction
import org.opensearch.alerting.transport.TransportDeleteAlertingCommentAction
Expand Down Expand Up @@ -92,6 +93,7 @@ import org.opensearch.common.settings.Setting
import org.opensearch.common.settings.Settings
import org.opensearch.common.settings.SettingsFilter
import org.opensearch.commons.alerting.action.AlertingActions
import org.opensearch.commons.alerting.action.DocLevelMonitorFanOutAction
import org.opensearch.commons.alerting.aggregation.bucketselectorext.BucketSelectorExtAggregationBuilder
import org.opensearch.commons.alerting.model.BucketLevelTrigger
import org.opensearch.commons.alerting.model.ChainedAlertTrigger
Expand All @@ -103,6 +105,7 @@ import org.opensearch.commons.alerting.model.QueryLevelTrigger
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.commons.alerting.model.SearchInput
import org.opensearch.commons.alerting.model.Workflow
import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorTrigger
import org.opensearch.core.action.ActionResponse
import org.opensearch.core.common.io.stream.NamedWriteableRegistry
import org.opensearch.core.common.io.stream.StreamInput
Expand All @@ -117,6 +120,7 @@ import org.opensearch.painless.spi.Whitelist
import org.opensearch.painless.spi.WhitelistLoader
import org.opensearch.percolator.PercolatorPluginExt
import org.opensearch.plugins.ActionPlugin
import org.opensearch.plugins.ExtensiblePlugin
import org.opensearch.plugins.ReloadablePlugin
import org.opensearch.plugins.ScriptPlugin
import org.opensearch.plugins.SearchPlugin
Expand Down Expand Up @@ -180,6 +184,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
lateinit var alertIndices: AlertIndices
lateinit var clusterService: ClusterService
lateinit var destinationMigrationCoordinator: DestinationMigrationCoordinator
var monitorTypeToMonitorRunners: MutableMap<String, RemoteMonitorRegistry> = mutableMapOf()

override fun getRestHandlers(
settings: Settings,
Expand Down Expand Up @@ -260,6 +265,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
ClusterMetricsInput.XCONTENT_REGISTRY,
DocumentLevelTrigger.XCONTENT_REGISTRY,
ChainedAlertTrigger.XCONTENT_REGISTRY,
RemoteMonitorTrigger.XCONTENT_REGISTRY,
Workflow.XCONTENT_REGISTRY
)
}
Expand Down Expand Up @@ -301,6 +307,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
.registerLockService(lockService)
.registerConsumers()
.registerDestinationSettings()
.registerRemoteMonitors(monitorTypeToMonitorRunners)
scheduledJobIndices = ScheduledJobIndices(client.admin(), clusterService)
commentsIndices = CommentsIndices(environment.settings(), client, threadPool, clusterService)
docLevelMonitorQueries = DocLevelMonitorQueries(client, clusterService)
Expand Down Expand Up @@ -443,4 +450,20 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
)
)
}

override fun loadExtensions(loader: ExtensiblePlugin.ExtensionLoader) {
for (monitorExtension in loader.loadExtensions(RemoteMonitorRunnerExtension::class.java)) {
val monitorTypesToMonitorRunners = monitorExtension.getMonitorTypesToMonitorRunners()

for (monitorTypeToMonitorRunner in monitorTypesToMonitorRunners) {
val monitorType = monitorTypeToMonitorRunner.key
val monitorRunner = monitorTypeToMonitorRunner.value

if (!this.monitorTypeToMonitorRunners.containsKey(monitorType)) {
val monitorRegistry = RemoteMonitorRegistry(monitorType, monitorRunner)
this.monitorTypeToMonitorRunners[monitorType] = monitorRegistry
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,7 @@ import org.opensearch.action.index.IndexRequest
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.model.ActionRunResult
import org.opensearch.alerting.model.AlertContext
import org.opensearch.alerting.model.BucketLevelTriggerRunResult
import org.opensearch.alerting.model.InputRunResults
import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.opensearchapi.InjectorContextElement
import org.opensearch.alerting.opensearchapi.convertToMap
import org.opensearch.alerting.opensearchapi.retry
Expand All @@ -31,17 +27,21 @@ import org.opensearch.alerting.util.getBucketKeysHash
import org.opensearch.alerting.util.getCancelAfterTimeInterval
import org.opensearch.alerting.util.getCombinedTriggerRunResult
import org.opensearch.alerting.util.printsSampleDocData
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.client.Client
import org.opensearch.common.unit.TimeValue
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.model.ActionRunResult
import org.opensearch.commons.alerting.model.Alert
import org.opensearch.commons.alerting.model.BucketLevelTrigger
import org.opensearch.commons.alerting.model.BucketLevelTriggerRunResult
import org.opensearch.commons.alerting.model.Comment
import org.opensearch.commons.alerting.model.Finding
import org.opensearch.commons.alerting.model.InputRunResults
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.MonitorRunResult
import org.opensearch.commons.alerting.model.SearchInput
import org.opensearch.commons.alerting.model.WorkflowRunContext
import org.opensearch.commons.alerting.model.action.AlertCategory
import org.opensearch.commons.alerting.model.action.PerAlertActionScope
import org.opensearch.commons.alerting.model.action.PerExecutionActionScope
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,24 @@ import org.opensearch.ExceptionsHelper
import org.opensearch.Version
import org.opensearch.action.ActionListenerResponseHandler
import org.opensearch.action.support.GroupedActionListener
import org.opensearch.alerting.action.DocLevelMonitorFanOutAction
import org.opensearch.alerting.action.DocLevelMonitorFanOutRequest
import org.opensearch.alerting.action.DocLevelMonitorFanOutResponse
import org.opensearch.alerting.model.ActionRunResult
import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
import org.opensearch.alerting.model.IndexExecutionContext
import org.opensearch.alerting.model.InputRunResults
import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.node.DiscoveryNode
import org.opensearch.cluster.routing.ShardRouting
import org.opensearch.cluster.service.ClusterService
import org.opensearch.commons.alerting.action.DocLevelMonitorFanOutAction
import org.opensearch.commons.alerting.action.DocLevelMonitorFanOutRequest
import org.opensearch.commons.alerting.action.DocLevelMonitorFanOutResponse
import org.opensearch.commons.alerting.model.ActionRunResult
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
import org.opensearch.commons.alerting.model.DocLevelQuery
import org.opensearch.commons.alerting.model.DocumentLevelTriggerRunResult
import org.opensearch.commons.alerting.model.IndexExecutionContext
import org.opensearch.commons.alerting.model.InputRunResults
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.MonitorRunResult
import org.opensearch.commons.alerting.model.WorkflowRunContext
import org.opensearch.commons.alerting.util.AlertingException
import org.opensearch.core.action.ActionListener
import org.opensearch.core.common.breaker.CircuitBreakingException
import org.opensearch.core.common.io.stream.Writeable
Expand Down Expand Up @@ -436,7 +436,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
if (res.exception == null) {
return null
} else {
exceptions.add(res.exception)
exceptions.add(res.exception!!)
}
}
return AlertingException.merge(*exceptions.toTypedArray())
Expand Down Expand Up @@ -501,9 +501,9 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
if (response.exception == null) {
if (response.inputResults.error != null) {
if (response.inputResults.error is AlertingException) {
errors.add(response.inputResults.error)
errors.add(response.inputResults.error as AlertingException)
} else {
errors.add(AlertingException.wrap(response.inputResults.error) as AlertingException)
errors.add(AlertingException.wrap(response.inputResults.error as Exception) as AlertingException)
}
}
val partialResult = response.inputResults.results
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import kotlinx.coroutines.withContext
import org.apache.logging.log4j.LogManager
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.alerting.model.InputRunResults
import org.opensearch.alerting.model.TriggerAfterKey
import org.opensearch.alerting.opensearchapi.convertToMap
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.settings.AlertingSettings
Expand All @@ -21,7 +19,6 @@ import org.opensearch.alerting.util.addUserBackendRolesFilter
import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.executeTransportAction
import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.toMap
import org.opensearch.alerting.util.getRoleFilterEnabled
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.client.Client
import org.opensearch.cluster.routing.Preference
import org.opensearch.cluster.service.ClusterService
Expand All @@ -30,8 +27,11 @@ import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.model.ClusterMetricsInput
import org.opensearch.commons.alerting.model.InputRunResults
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.SearchInput
import org.opensearch.commons.alerting.model.TriggerAfterKey
import org.opensearch.commons.alerting.model.WorkflowRunContext
import org.opensearch.core.common.io.stream.NamedWriteableAwareStreamInput
import org.opensearch.core.common.io.stream.NamedWriteableRegistry
import org.opensearch.core.xcontent.NamedXContentRegistry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@ import org.opensearch.action.get.GetResponse
import org.opensearch.action.index.IndexRequest
import org.opensearch.action.index.IndexResponse
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.model.MonitorMetadata
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.client.Client
import org.opensearch.cluster.service.ClusterService
Expand All @@ -40,7 +38,10 @@ import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.MonitorMetadata
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.commons.alerting.model.remote.monitors.RemoteDocLevelMonitorInput
import org.opensearch.commons.alerting.util.AlertingException
import org.opensearch.core.rest.RestStatus
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.core.xcontent.ToXContent
Expand Down Expand Up @@ -187,12 +188,14 @@ object MonitorMetadataService :

suspend fun recreateRunContext(metadata: MonitorMetadata, monitor: Monitor): MonitorMetadata {
try {
val monitorIndex = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) {
val monitorIndex = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR.value)
(monitor.inputs[0] as DocLevelMonitorInput).indices[0]
} else null
val runContext = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) {
else if (monitor.monitorType.endsWith(Monitor.MonitorType.DOC_LEVEL_MONITOR.value))
(monitor.inputs[0] as RemoteDocLevelMonitorInput).docLevelMonitorInput.indices[0]
else null
val runContext = if (monitor.monitorType.endsWith(Monitor.MonitorType.DOC_LEVEL_MONITOR.value))
createFullRunContext(monitorIndex, metadata.lastRunContext as MutableMap<String, MutableMap<String, Any>>)
} else null
else null
return if (runContext != null) {
metadata.copy(
lastRunContext = runContext
Expand All @@ -210,10 +213,12 @@ object MonitorMetadataService :
createWithRunContext: Boolean,
workflowMetadataId: String? = null,
): MonitorMetadata {
val monitorIndex = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR)
val monitorIndex = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR.value)
(monitor.inputs[0] as DocLevelMonitorInput).indices[0]
else if (monitor.monitorType.endsWith(Monitor.MonitorType.DOC_LEVEL_MONITOR.value))
(monitor.inputs[0] as RemoteDocLevelMonitorInput).docLevelMonitorInput.indices[0]
else null
val runContext = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR && createWithRunContext)
val runContext = if (monitor.monitorType.endsWith(Monitor.MonitorType.DOC_LEVEL_MONITOR.value))
createFullRunContext(monitorIndex)
else emptyMap()
return MonitorMetadata(
Expand Down
Loading
Loading