-
Notifications
You must be signed in to change notification settings - Fork 103
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
initial commit for remote monitor support #1547
Changes from all commits
197b7f8
0181b85
70f165f
8bd00b4
a840958
315d8ac
629a0c3
f48253c
77a83ff
4b88443
ae13a15
5564e3a
58a6f29
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,7 +6,6 @@ | |
package org.opensearch.alerting | ||
|
||
import org.opensearch.action.ActionRequest | ||
import org.opensearch.alerting.action.DocLevelMonitorFanOutAction | ||
import org.opensearch.alerting.action.ExecuteMonitorAction | ||
import org.opensearch.alerting.action.ExecuteWorkflowAction | ||
import org.opensearch.alerting.action.GetDestinationsAction | ||
|
@@ -25,6 +24,7 @@ import org.opensearch.alerting.core.resthandler.RestScheduledJobStatsHandler | |
import org.opensearch.alerting.core.schedule.JobScheduler | ||
import org.opensearch.alerting.core.settings.LegacyOpenDistroScheduledJobSettings | ||
import org.opensearch.alerting.core.settings.ScheduledJobSettings | ||
import org.opensearch.alerting.remote.monitors.RemoteMonitorRegistry | ||
import org.opensearch.alerting.resthandler.RestAcknowledgeAlertAction | ||
import org.opensearch.alerting.resthandler.RestAcknowledgeChainedAlertAction | ||
import org.opensearch.alerting.resthandler.RestDeleteMonitorAction | ||
|
@@ -52,6 +52,7 @@ import org.opensearch.alerting.settings.AlertingSettings.Companion.DOC_LEVEL_MON | |
import org.opensearch.alerting.settings.DestinationSettings | ||
import org.opensearch.alerting.settings.LegacyOpenDistroAlertingSettings | ||
import org.opensearch.alerting.settings.LegacyOpenDistroDestinationSettings | ||
import org.opensearch.alerting.spi.RemoteMonitorRunnerExtension | ||
import org.opensearch.alerting.transport.TransportAcknowledgeAlertAction | ||
import org.opensearch.alerting.transport.TransportAcknowledgeChainedAlertAction | ||
import org.opensearch.alerting.transport.TransportDeleteMonitorAction | ||
|
@@ -85,6 +86,7 @@ import org.opensearch.common.settings.Setting | |
import org.opensearch.common.settings.Settings | ||
import org.opensearch.common.settings.SettingsFilter | ||
import org.opensearch.commons.alerting.action.AlertingActions | ||
import org.opensearch.commons.alerting.action.DocLevelMonitorFanOutAction | ||
import org.opensearch.commons.alerting.aggregation.bucketselectorext.BucketSelectorExtAggregationBuilder | ||
import org.opensearch.commons.alerting.model.BucketLevelTrigger | ||
import org.opensearch.commons.alerting.model.ChainedAlertTrigger | ||
|
@@ -96,6 +98,7 @@ import org.opensearch.commons.alerting.model.QueryLevelTrigger | |
import org.opensearch.commons.alerting.model.ScheduledJob | ||
import org.opensearch.commons.alerting.model.SearchInput | ||
import org.opensearch.commons.alerting.model.Workflow | ||
import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorTrigger | ||
import org.opensearch.core.action.ActionResponse | ||
import org.opensearch.core.common.io.stream.NamedWriteableRegistry | ||
import org.opensearch.core.common.io.stream.StreamInput | ||
|
@@ -110,6 +113,7 @@ import org.opensearch.painless.spi.AllowlistLoader | |
import org.opensearch.painless.spi.PainlessExtension | ||
import org.opensearch.percolator.PercolatorPluginExt | ||
import org.opensearch.plugins.ActionPlugin | ||
import org.opensearch.plugins.ExtensiblePlugin | ||
import org.opensearch.plugins.ReloadablePlugin | ||
import org.opensearch.plugins.ScriptPlugin | ||
import org.opensearch.plugins.SearchPlugin | ||
|
@@ -162,6 +166,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R | |
lateinit var alertIndices: AlertIndices | ||
lateinit var clusterService: ClusterService | ||
lateinit var destinationMigrationCoordinator: DestinationMigrationCoordinator | ||
var monitorTypeToMonitorRunners: MutableMap<String, RemoteMonitorRegistry> = mutableMapOf() | ||
|
||
override fun getRestHandlers( | ||
settings: Settings, | ||
|
@@ -236,6 +241,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R | |
ClusterMetricsInput.XCONTENT_REGISTRY, | ||
DocumentLevelTrigger.XCONTENT_REGISTRY, | ||
ChainedAlertTrigger.XCONTENT_REGISTRY, | ||
RemoteMonitorTrigger.XCONTENT_REGISTRY, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we also need RemoteMonitorInput xcontent registry? if yes, can you add appropriate tests to serialize There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we dont need |
||
Workflow.XCONTENT_REGISTRY | ||
) | ||
} | ||
|
@@ -277,6 +283,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R | |
.registerLockService(lockService) | ||
.registerConsumers() | ||
.registerDestinationSettings() | ||
.registerRemoteMonitors(monitorTypeToMonitorRunners) | ||
scheduledJobIndices = ScheduledJobIndices(client.admin(), clusterService) | ||
docLevelMonitorQueries = DocLevelMonitorQueries(client, clusterService) | ||
scheduler = JobScheduler(threadPool, runner) | ||
|
@@ -409,4 +416,20 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R | |
) | ||
) | ||
} | ||
|
||
override fun loadExtensions(loader: ExtensiblePlugin.ExtensionLoader) { | ||
for (monitorExtension in loader.loadExtensions(RemoteMonitorRunnerExtension::class.java)) { | ||
val monitorTypesToMonitorRunners = monitorExtension.getMonitorTypesToMonitorRunners() | ||
|
||
for (monitorTypeToMonitorRunner in monitorTypesToMonitorRunners) { | ||
val monitorType = monitorTypeToMonitorRunner.key | ||
val monitorRunner = monitorTypeToMonitorRunner.value | ||
|
||
if (!this.monitorTypeToMonitorRunners.containsKey(monitorType)) { | ||
val monitorRegistry = RemoteMonitorRegistry(monitorType, monitorRunner) | ||
this.monitorTypeToMonitorRunners[monitorType] = monitorRegistry | ||
} | ||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,10 +25,8 @@ import org.opensearch.action.get.GetResponse | |
import org.opensearch.action.index.IndexRequest | ||
import org.opensearch.action.index.IndexResponse | ||
import org.opensearch.action.support.WriteRequest | ||
import org.opensearch.alerting.model.MonitorMetadata | ||
import org.opensearch.alerting.opensearchapi.suspendUntil | ||
import org.opensearch.alerting.settings.AlertingSettings | ||
import org.opensearch.alerting.util.AlertingException | ||
import org.opensearch.alerting.util.IndexUtils | ||
import org.opensearch.client.Client | ||
import org.opensearch.cluster.service.ClusterService | ||
|
@@ -40,17 +38,16 @@ import org.opensearch.common.xcontent.XContentHelper | |
import org.opensearch.common.xcontent.XContentType | ||
import org.opensearch.commons.alerting.model.DocLevelMonitorInput | ||
import org.opensearch.commons.alerting.model.Monitor | ||
import org.opensearch.commons.alerting.model.Monitor.MonitorType | ||
import org.opensearch.commons.alerting.model.MonitorMetadata | ||
import org.opensearch.commons.alerting.model.ScheduledJob | ||
import org.opensearch.commons.alerting.util.AlertingException | ||
import org.opensearch.core.rest.RestStatus | ||
import org.opensearch.core.xcontent.NamedXContentRegistry | ||
import org.opensearch.core.xcontent.ToXContent | ||
import org.opensearch.core.xcontent.XContentParser | ||
import org.opensearch.core.xcontent.XContentParserUtils | ||
import org.opensearch.index.seqno.SequenceNumbers | ||
import org.opensearch.transport.RemoteTransportException | ||
import java.util.Locale | ||
import kotlin.collections.HashMap | ||
|
||
private val log = LogManager.getLogger(MonitorMetadataService::class.java) | ||
|
||
|
@@ -188,10 +185,10 @@ object MonitorMetadataService : | |
|
||
suspend fun recreateRunContext(metadata: MonitorMetadata, monitor: Monitor): MonitorMetadata { | ||
try { | ||
val monitorIndex = if (MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) == MonitorType.DOC_LEVEL_MONITOR) | ||
val monitorIndex = if (monitor.monitorType.endsWith(Monitor.MonitorType.DOC_LEVEL_MONITOR.value)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we just have a separate flag instead of depending on name check can be |
||
(monitor.inputs[0] as DocLevelMonitorInput).indices[0] | ||
else null | ||
val runContext = if (MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) == MonitorType.DOC_LEVEL_MONITOR) | ||
val runContext = if (monitor.monitorType.endsWith(Monitor.MonitorType.DOC_LEVEL_MONITOR.value)) | ||
createFullRunContext(monitorIndex, metadata.lastRunContext as MutableMap<String, MutableMap<String, Any>>) | ||
else null | ||
return if (runContext != null) { | ||
|
@@ -211,13 +208,12 @@ object MonitorMetadataService : | |
createWithRunContext: Boolean, | ||
workflowMetadataId: String? = null, | ||
): MonitorMetadata { | ||
val monitorIndex = if (MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) == MonitorType.DOC_LEVEL_MONITOR) | ||
val monitorIndex = if (monitor.monitorType.endsWith(Monitor.MonitorType.DOC_LEVEL_MONITOR.value)) | ||
(monitor.inputs[0] as DocLevelMonitorInput).indices[0] | ||
else null | ||
val runContext = | ||
if (MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) == MonitorType.DOC_LEVEL_MONITOR && createWithRunContext) | ||
createFullRunContext(monitorIndex) | ||
else emptyMap() | ||
val runContext = if (monitor.monitorType.endsWith(Monitor.MonitorType.DOC_LEVEL_MONITOR.value)) | ||
createFullRunContext(monitorIndex) | ||
else emptyMap() | ||
return MonitorMetadata( | ||
id = MonitorMetadata.getId(monitor, workflowMetadataId), | ||
seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this the same dependency used by jobscheduler or notification for shadow plugin?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes.