From 56bba0dd751eb6604c27be0ed8dd7cc62b2121dd Mon Sep 17 00:00:00 2001 From: Stevan Buzejic Date: Fri, 27 Jan 2023 19:46:45 +0100 Subject: [PATCH 1/4] Added integrations tests for checking workflow creation and update scenario Signed-off-by: Stevan Buzejic --- .../org/opensearch/alerting/AlertingPlugin.kt | 9 +- .../model/workflow/WorkflowRunResult.kt | 3 +- .../TransportIndexCompositeWorkflowAction.kt | 79 +++-- .../alerting/workflow/WorkflowRunner.kt | 1 - .../workflow/WorkflowRunnerService.kt | 2 - .../alerting/MonitorDataSourcesIT.kt | 1 - .../org/opensearch/alerting/TestHelpers.kt | 112 ++++-- .../opensearch/alerting/WorkflowMonitorIT.kt | 327 ++++++++++++++++++ .../transport/WorkflowSingleNodeTestCase.kt | 58 ++++ core/build.gradle | 2 +- 10 files changed, 521 insertions(+), 73 deletions(-) create mode 100644 alerting/src/test/kotlin/org/opensearch/alerting/WorkflowMonitorIT.kt create mode 100644 alerting/src/test/kotlin/org/opensearch/alerting/transport/WorkflowSingleNodeTestCase.kt diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index 66ebd5eb3..61439ea70 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -51,6 +51,7 @@ import org.opensearch.alerting.transport.TransportGetEmailAccountAction import org.opensearch.alerting.transport.TransportGetEmailGroupAction import org.opensearch.alerting.transport.TransportGetFindingsSearchAction import org.opensearch.alerting.transport.TransportGetMonitorAction +import org.opensearch.alerting.transport.TransportIndexCompositeWorkflowAction import org.opensearch.alerting.transport.TransportIndexMonitorAction import org.opensearch.alerting.transport.TransportSearchEmailAccountAction import org.opensearch.alerting.transport.TransportSearchEmailGroupAction @@ -80,6 +81,7 @@ import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.QueryLevelTrigger import org.opensearch.commons.alerting.model.ScheduledJob import org.opensearch.commons.alerting.model.SearchInput +import org.opensearch.commons.alerting.model.Workflow import org.opensearch.env.Environment import org.opensearch.env.NodeEnvironment import org.opensearch.index.IndexModule @@ -180,8 +182,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R ActionPlugin.ActionHandler(SearchEmailGroupAction.INSTANCE, TransportSearchEmailGroupAction::class.java), ActionPlugin.ActionHandler(GetDestinationsAction.INSTANCE, TransportGetDestinationsAction::class.java), ActionPlugin.ActionHandler(AlertingActions.GET_ALERTS_ACTION_TYPE, TransportGetAlertsAction::class.java), - ActionPlugin.ActionHandler(AlertingActions.GET_FINDINGS_ACTION_TYPE, TransportGetFindingsSearchAction::class.java) - + ActionPlugin.ActionHandler(AlertingActions.GET_FINDINGS_ACTION_TYPE, TransportGetFindingsSearchAction::class.java), + ActionPlugin.ActionHandler(AlertingActions.INDEX_WORKFLOW_ACTION_TYPE, TransportIndexCompositeWorkflowAction::class.java) ) } @@ -193,7 +195,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R QueryLevelTrigger.XCONTENT_REGISTRY, BucketLevelTrigger.XCONTENT_REGISTRY, ClusterMetricsInput.XCONTENT_REGISTRY, - DocumentLevelTrigger.XCONTENT_REGISTRY + DocumentLevelTrigger.XCONTENT_REGISTRY, + Workflow.XCONTENT_REGISTRY ) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/workflow/WorkflowRunResult.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/workflow/WorkflowRunResult.kt index dc643e716..cc6b61745 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/workflow/WorkflowRunResult.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/workflow/WorkflowRunResult.kt @@ -1,4 +1,3 @@ package org.opensearch.alerting.model.workflow -data class WorkflowRunResult { -} +data class WorkflowRunResult(private val someArg: String) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexCompositeWorkflowAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexCompositeWorkflowAction.kt index a6273a91c..1ea721b39 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexCompositeWorkflowAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexCompositeWorkflowAction.kt @@ -166,7 +166,13 @@ class TransportIndexCompositeWorkflowAction @Inject constructor( ) { fun resolveUserAndStart() { scope.launch { - validateRequest(request, actionListener) + try { + validateRequest(request) + } catch (e: Exception) { + actionListener.onFailure(e) + return@launch + } + if (user == null) { // Security is disabled, add empty user to Monitor. user is null for older versions. request.workflow = request.workflow @@ -456,68 +462,72 @@ class TransportIndexCompositeWorkflowAction @Inject constructor( } } - suspend fun validateRequest(request: IndexWorkflowRequest, listener: ActionListener) { - val compositeInput = request.workflow.inputs.get(0) as CompositeInput + suspend fun validateRequest(request: IndexWorkflowRequest) { + val compositeInput = request.workflow.inputs[0] as CompositeInput val monitorIds = compositeInput.sequence.delegates.stream().map { it.monitorId }.collect(Collectors.toList()) - validateDuplicateDelegateMonitorReferenceExists(monitorIds, listener) - validateSequenceOrdering(compositeInput.sequence.delegates, listener) - validateChainedFindings(compositeInput.sequence.delegates, listener) - val delegateMonitors = getDelegateMonitors(monitorIds, listener) - validateDelegateMonitorsExist(monitorIds, delegateMonitors, listener) + + if (monitorIds.isNullOrEmpty()) + throw AlertingException.wrap(IllegalArgumentException("Delegates list can not be empty.")) + + validateDuplicateDelegateMonitorReferenceExists(monitorIds) + validateSequenceOrdering(compositeInput.sequence.delegates) + validateChainedFindings(compositeInput.sequence.delegates) + val delegateMonitors = getDelegateMonitors(monitorIds) + validateDelegateMonitorsExist(monitorIds, delegateMonitors) // todo: validate that user has roles to reference delegate monitors } - private fun validateChainedFindings(delegates: List, listener: ActionListener) { + private fun validateChainedFindings(delegates: List) { val monitorIdOrderMap: Map = delegates.associate { it.monitorId to it.order } delegates.forEach { if (it.chainedFindings != null) { if (monitorIdOrderMap.containsKey(it.chainedFindings!!.monitorId) == false) { - listener.onFailure(Exception("Chained Findings Monitor ${it.chainedFindings!!.monitorId} doesn't exist in sequence")) + throw AlertingException.wrap( + IllegalArgumentException( + "Chained Findings Monitor ${it.chainedFindings!!.monitorId} doesn't exist in sequence" + ) + ) } - if (it.order <= monitorIdOrderMap.get(it.chainedFindings!!.monitorId)!!) { - listener.onFailure( - Exception( + if (it.order <= monitorIdOrderMap[it.chainedFindings!!.monitorId]!!) + throw AlertingException.wrap( + IllegalArgumentException( "Chained Findings Monitor ${it.chainedFindings!!.monitorId} should be executed before monitor ${it.monitorId}" ) ) - } } } } - private fun validateSequenceOrdering(delegates: List, listener: ActionListener) { + private fun validateSequenceOrdering(delegates: List) { val orderSet = delegates.stream().filter { it.order > 0 }.map { it.order }.collect(Collectors.toSet()) if (orderSet.size != delegates.size) { - listener.onFailure(Exception("Sequence ordering of delegate monitor shouldn't contain duplicate order values")) + throw AlertingException.wrap(IllegalArgumentException("Sequence ordering of delegate monitor shouldn't contain duplicate order values")) } } private fun validateDuplicateDelegateMonitorReferenceExists( - monitorIds: MutableList, - listener: ActionListener + monitorIds: MutableList ) { if (monitorIds.toSet().size != monitorIds.size) { - listener.onFailure(Exception("duplicate is not allowed")) + throw AlertingException.wrap(IllegalArgumentException("Duplicate delegates not allowed")) } } private fun validateDelegateMonitorsExist( monitorIds: List, - delegateMonitors: List, - actionListener: ActionListener + delegateMonitors: List ) { val reqMonitorIds: MutableList = monitorIds as MutableList delegateMonitors.forEach { reqMonitorIds.remove(it.id) } if (reqMonitorIds.isNotEmpty()) { - actionListener.onFailure(Exception("${reqMonitorIds.joinToString { "," }} are not valid monitor ids")) + throw AlertingException.wrap(IllegalArgumentException(("${reqMonitorIds.joinToString()} are not valid monitor ids"))) } } private suspend fun getDelegateMonitors( - monitorIds: MutableList, - actionListener: ActionListener + monitorIds: MutableList ): List { val query = QueryBuilders.boolQuery().filter(QueryBuilders.termsQuery("_id", monitorIds)) val searchSource = SearchSourceBuilder().query(query) @@ -527,20 +537,15 @@ class TransportIndexCompositeWorkflowAction @Inject constructor( if (response.isTimedOut) { return monitors } - try { - for (hit in response.hits) { - XContentType.JSON.xContent().createParser( - xContentRegistry, - LoggingDeprecationHandler.INSTANCE, hit.sourceAsString - ).use { hitsParser -> - val monitor = ScheduledJob.parse(hitsParser, hit.id, hit.version) - monitors.add(monitor as Monitor) - } + for (hit in response.hits) { + XContentType.JSON.xContent().createParser( + xContentRegistry, + LoggingDeprecationHandler.INSTANCE, hit.sourceAsString + ).use { hitsParser -> + val monitor = ScheduledJob.parse(hitsParser, hit.id, hit.version) + monitors.add(monitor as Monitor) } - return monitors - } catch (e: Exception) { - actionListener.onFailure(e) - return listOf() } + return monitors } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/WorkflowRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/WorkflowRunner.kt index a09d1cc99..0fc989800 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/WorkflowRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/WorkflowRunner.kt @@ -2,7 +2,6 @@ package org.opensearch.alerting.workflow import org.opensearch.alerting.MonitorRunnerExecutionContext import org.opensearch.alerting.model.MonitorRunResult -import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.Workflow import java.time.Instant diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/WorkflowRunnerService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/WorkflowRunnerService.kt index f99d9a802..6379f2f55 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/WorkflowRunnerService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/WorkflowRunnerService.kt @@ -180,7 +180,6 @@ object WorkflowRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompo } override fun postDelete(jobId: String) { - } override fun runJob(job: ScheduledJob, periodStart: Instant, periodEnd: Instant) { @@ -195,7 +194,6 @@ object WorkflowRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompo suspend fun runJob(job: ScheduledJob, periodStart: Instant, periodEnd: Instant, dryrun: Boolean): MonitorRunResult<*> { val workflow = job as Workflow return CompositeWorkflowRunner.runWorkflow(workflow, monitorCtx, periodStart, periodEnd, dryrun) - } // TODO: See if we can move below methods (or few of these) to a common utils diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt index 5afc1b7a5..019d99407 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt @@ -203,7 +203,6 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { val finding = Finding.parse(xcp) findings1.add(finding) } - logger.error("sashank: response: {}", finalQueryResponse) val indexToRelatedDocIdsMap = mutableMapOf>() for (finding in findings1) { val ids = indexToRelatedDocIdsMap.getOrDefault(index, mutableListOf()) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt index a4e3eb347..4e828b12c 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt @@ -39,8 +39,11 @@ import org.opensearch.commons.alerting.model.ActionExecutionResult import org.opensearch.commons.alerting.model.AggregationResultBucket import org.opensearch.commons.alerting.model.Alert import org.opensearch.commons.alerting.model.BucketLevelTrigger +import org.opensearch.commons.alerting.model.ChainedFindings import org.opensearch.commons.alerting.model.ClusterMetricsInput +import org.opensearch.commons.alerting.model.CompositeInput import org.opensearch.commons.alerting.model.DataSources +import org.opensearch.commons.alerting.model.Delegate import org.opensearch.commons.alerting.model.DocLevelMonitorInput import org.opensearch.commons.alerting.model.DocLevelQuery import org.opensearch.commons.alerting.model.DocumentLevelTrigger @@ -51,7 +54,10 @@ import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.QueryLevelTrigger import org.opensearch.commons.alerting.model.Schedule import org.opensearch.commons.alerting.model.SearchInput +import org.opensearch.commons.alerting.model.Sequence import org.opensearch.commons.alerting.model.Trigger +import org.opensearch.commons.alerting.model.Workflow +import org.opensearch.commons.alerting.model.Workflow.WorkflowType import org.opensearch.commons.alerting.model.action.Action import org.opensearch.commons.alerting.model.action.ActionExecutionPolicy import org.opensearch.commons.alerting.model.action.ActionExecutionScope @@ -84,7 +90,7 @@ fun randomQueryLevelMonitor( triggers: List = (1..randomInt(10)).map { randomQueryLevelTrigger() }, enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null, lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), - withMetadata: Boolean = false + withMetadata: Boolean = false, ): Monitor { return Monitor( name = name, monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR, enabled = enabled, inputs = inputs, @@ -102,7 +108,7 @@ fun randomQueryLevelMonitorWithoutUser( triggers: List = (1..randomInt(10)).map { randomQueryLevelTrigger() }, enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null, lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), - withMetadata: Boolean = false + withMetadata: Boolean = false, ): Monitor { return Monitor( name = name, monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR, enabled = enabled, inputs = inputs, @@ -126,7 +132,7 @@ fun randomBucketLevelMonitor( triggers: List = (1..randomInt(10)).map { randomBucketLevelTrigger() }, enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null, lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), - withMetadata: Boolean = false + withMetadata: Boolean = false, ): Monitor { return Monitor( name = name, monitorType = Monitor.MonitorType.BUCKET_LEVEL_MONITOR, enabled = enabled, inputs = inputs, @@ -151,7 +157,7 @@ fun randomBucketLevelMonitor( enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null, lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), withMetadata: Boolean = false, - dataSources: DataSources + dataSources: DataSources, ): Monitor { return Monitor( name = name, monitorType = Monitor.MonitorType.BUCKET_LEVEL_MONITOR, enabled = enabled, inputs = inputs, @@ -170,7 +176,7 @@ fun randomClusterMetricsMonitor( triggers: List = (1..randomInt(10)).map { randomQueryLevelTrigger() }, enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null, lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), - withMetadata: Boolean = false + withMetadata: Boolean = false, ): Monitor { return Monitor( name = name, monitorType = Monitor.MonitorType.CLUSTER_METRICS_MONITOR, enabled = enabled, inputs = inputs, @@ -188,7 +194,7 @@ fun randomDocumentLevelMonitor( triggers: List = (1..randomInt(10)).map { randomQueryLevelTrigger() }, enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null, lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), - withMetadata: Boolean = false + withMetadata: Boolean = false, ): Monitor { return Monitor( name = name, monitorType = Monitor.MonitorType.DOC_LEVEL_MONITOR, enabled = enabled, inputs = inputs, @@ -208,7 +214,7 @@ fun randomDocumentLevelMonitor( lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), withMetadata: Boolean = false, dataSources: DataSources, - owner: String? = null + owner: String? = null, ): Monitor { return Monitor( name = name, monitorType = Monitor.MonitorType.DOC_LEVEL_MONITOR, enabled = enabled, inputs = inputs, @@ -217,13 +223,63 @@ fun randomDocumentLevelMonitor( ) } +fun randomWorkflowMonitor( + monitorIds: List, + name: String = OpenSearchRestTestCase.randomAlphaOfLength(10), + user: User? = randomUser(), + schedule: Schedule = IntervalSchedule(interval = 5, unit = ChronoUnit.MINUTES), + enabled: Boolean = randomBoolean(), + enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null, + lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS) +): Workflow { + val delegates = mutableListOf() + if (!monitorIds.isNullOrEmpty()) { + delegates.add(Delegate(1, monitorIds[0])) + for (i in 1 until monitorIds.size) { + delegates.add(Delegate(i + 1, monitorIds [i], ChainedFindings(monitorIds[i - 1]))) + } + } + + return Workflow( + name = name, + enabled = enabled, + schedule = schedule, + lastUpdateTime = lastUpdateTime, + enabledTime = enabledTime, + workflowType = WorkflowType.COMPOSITE, + user = user, + inputs = listOf(CompositeInput(Sequence(delegates))) + ) +} + +fun randomWorkflowMonitorWithDelegates( + delegates: List, + name: String = OpenSearchRestTestCase.randomAlphaOfLength(10), + user: User? = randomUser(), + schedule: Schedule = IntervalSchedule(interval = 5, unit = ChronoUnit.MINUTES), + enabled: Boolean = randomBoolean(), + enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null, + lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), +): Workflow { + return Workflow( + name = name, + enabled = enabled, + schedule = schedule, + lastUpdateTime = lastUpdateTime, + enabledTime = enabledTime, + workflowType = WorkflowType.COMPOSITE, + user = user, + inputs = listOf(CompositeInput(Sequence(delegates))) + ) +} + fun randomQueryLevelTrigger( id: String = UUIDs.base64UUID(), name: String = OpenSearchRestTestCase.randomAlphaOfLength(10), severity: String = "1", condition: Script = randomScript(), actions: List = mutableListOf(), - destinationId: String = "" + destinationId: String = "", ): QueryLevelTrigger { return QueryLevelTrigger( id = id, @@ -240,7 +296,7 @@ fun randomBucketLevelTrigger( severity: String = "1", bucketSelector: BucketSelectorExtAggregationBuilder = randomBucketSelectorExtAggregationBuilder(name = id), actions: List = mutableListOf(), - destinationId: String = "" + destinationId: String = "", ): BucketLevelTrigger { return BucketLevelTrigger( id = id, @@ -260,7 +316,7 @@ fun randomDocumentLevelTrigger( severity: String = "1", condition: Script = randomScript(), actions: List = mutableListOf(), - destinationId: String = "" + destinationId: String = "", ): DocumentLevelTrigger { return DocumentLevelTrigger( id = id, @@ -278,14 +334,14 @@ fun randomBucketSelectorExtAggregationBuilder( bucketsPathsMap: MutableMap = mutableMapOf("avg" to "10"), script: Script = randomBucketSelectorScript(params = bucketsPathsMap), parentBucketPath: String = "testPath", - filter: BucketSelectorExtFilter = BucketSelectorExtFilter(IncludeExclude("foo*", "bar*")) + filter: BucketSelectorExtFilter = BucketSelectorExtFilter(IncludeExclude("foo*", "bar*")), ): BucketSelectorExtAggregationBuilder { return BucketSelectorExtAggregationBuilder(name, bucketsPathsMap, script, parentBucketPath, filter) } fun randomBucketSelectorScript( idOrCode: String = "params.avg >= 0", - params: Map = mutableMapOf("avg" to "10") + params: Map = mutableMapOf("avg" to "10"), ): Script { return Script(Script.DEFAULT_SCRIPT_TYPE, Script.DEFAULT_SCRIPT_LANG, idOrCode, emptyMap(), params) } @@ -298,7 +354,7 @@ fun randomEmailAccount( port: Int = randomIntBetween(1, 100), method: EmailAccount.MethodType = randomEmailAccountMethod(), username: SecureString? = null, - password: SecureString? = null + password: SecureString? = null, ): EmailAccount { return EmailAccount( name = name, @@ -316,7 +372,7 @@ fun randomEmailGroup( name: String = salt + OpenSearchRestTestCase.randomAlphaOfLength(10), emails: List = (1..randomInt(10)).map { EmailEntry(email = salt + OpenSearchRestTestCase.randomAlphaOfLength(5) + "@email.com") - } + }, ): EmailGroup { return EmailGroup(name = name, emails = emails) } @@ -342,7 +398,7 @@ val TERM_DLS_QUERY = """{\"term\": { \"accessible\": true}}""" fun randomTemplateScript( source: String, - params: Map = emptyMap() + params: Map = emptyMap(), ): Script = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, source, params) fun randomAction( @@ -350,7 +406,7 @@ fun randomAction( template: Script = randomTemplateScript("Hello World"), destinationId: String = "", throttleEnabled: Boolean = false, - throttle: Throttle = randomThrottle() + throttle: Throttle = randomThrottle(), ) = Action(name, destinationId, template, template, throttleEnabled, throttle, actionExecutionPolicy = null) fun randomActionWithPolicy( @@ -359,7 +415,7 @@ fun randomActionWithPolicy( destinationId: String = "", throttleEnabled: Boolean = false, throttle: Throttle = randomThrottle(), - actionExecutionPolicy: ActionExecutionPolicy? = randomActionExecutionPolicy() + actionExecutionPolicy: ActionExecutionPolicy? = randomActionExecutionPolicy(), ): Action { return if (actionExecutionPolicy?.actionExecutionScope is PerExecutionActionScope) { // Return null for throttle when using PerExecutionActionScope since throttling is currently not supported for it @@ -371,11 +427,11 @@ fun randomActionWithPolicy( fun randomThrottle( value: Int = randomIntBetween(60, 120), - unit: ChronoUnit = ChronoUnit.MINUTES + unit: ChronoUnit = ChronoUnit.MINUTES, ) = Throttle(value, unit) fun randomActionExecutionPolicy( - actionExecutionScope: ActionExecutionScope = randomActionExecutionScope() + actionExecutionScope: ActionExecutionScope = randomActionExecutionScope(), ) = ActionExecutionPolicy(actionExecutionScope) fun randomActionExecutionScope(): ActionExecutionScope { @@ -400,7 +456,7 @@ fun randomDocLevelQuery( id: String = OpenSearchRestTestCase.randomAlphaOfLength(10), query: String = OpenSearchRestTestCase.randomAlphaOfLength(10), name: String = "${randomInt(5)}", - tags: List = mutableListOf(0..randomInt(10)).map { OpenSearchRestTestCase.randomAlphaOfLength(10) } + tags: List = mutableListOf(0..randomInt(10)).map { OpenSearchRestTestCase.randomAlphaOfLength(10) }, ): DocLevelQuery { return DocLevelQuery(id = id, query = query, name = name, tags = tags) } @@ -408,7 +464,7 @@ fun randomDocLevelQuery( fun randomDocLevelMonitorInput( description: String = OpenSearchRestTestCase.randomAlphaOfLength(randomInt(10)), indices: List = listOf(1..randomInt(10)).map { OpenSearchRestTestCase.randomAlphaOfLength(10) }, - queries: List = listOf(1..randomInt(10)).map { randomDocLevelQuery() } + queries: List = listOf(1..randomInt(10)).map { randomDocLevelQuery() }, ): DocLevelMonitorInput { return DocLevelMonitorInput(description = description, indices = indices, queries = queries) } @@ -420,7 +476,7 @@ fun randomFinding( monitorName: String = OpenSearchRestTestCase.randomAlphaOfLength(10), index: String = OpenSearchRestTestCase.randomAlphaOfLength(10), docLevelQueries: List = listOf(randomDocLevelQuery()), - timestamp: Instant = Instant.now() + timestamp: Instant = Instant.now(), ): Finding { return Finding( id = id, @@ -456,7 +512,7 @@ fun randomEmailAccountMethod(): EmailAccount.MethodType { fun randomActionExecutionResult( actionId: String = UUIDs.base64UUID(), lastExecutionTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), - throttledCount: Int = randomInt() + throttledCount: Int = randomInt(), ) = ActionExecutionResult(actionId, lastExecutionTime, throttledCount) fun randomQueryLevelMonitorRunResult(): MonitorRunResult { @@ -518,7 +574,7 @@ fun randomQueryLevelTriggerRunResult(): QueryLevelTriggerRunResult { fun randomClusterMetricsInput( path: String = ClusterMetricsInput.ClusterMetricType.CLUSTER_HEALTH.defaultPath, pathParams: String = "", - url: String = "" + url: String = "", ): ClusterMetricsInput { return ClusterMetricsInput(path, pathParams, url) } @@ -617,7 +673,7 @@ fun RestClient.makeRequest( endpoint: String, params: Map = emptyMap(), entity: HttpEntity? = null, - vararg headers: Header + vararg headers: Header, ): Response { val request = Request(method, endpoint) // TODO: remove PERMISSIVE option after moving system index access to REST API call @@ -642,7 +698,7 @@ fun RestClient.makeRequest( method: String, endpoint: String, entity: HttpEntity? = null, - vararg headers: Header + vararg headers: Header, ): Response { val request = Request(method, endpoint) val options = RequestOptions.DEFAULT.toBuilder() @@ -686,3 +742,7 @@ fun assertUserNull(map: Map) { fun assertUserNull(monitor: Monitor) { assertNull("User is not null", monitor.user) } + +fun assertUserNull(workflow: Workflow) { + assertNull("User is not null", workflow.user) +} diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowMonitorIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowMonitorIT.kt new file mode 100644 index 000000000..239eba265 --- /dev/null +++ b/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowMonitorIT.kt @@ -0,0 +1,327 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting + +import org.opensearch.alerting.transport.WorkflowSingleNodeTestCase +import org.opensearch.commons.alerting.model.ChainedFindings +import org.opensearch.commons.alerting.model.CompositeInput +import org.opensearch.commons.alerting.model.DataSources +import org.opensearch.commons.alerting.model.Delegate +import org.opensearch.commons.alerting.model.DocLevelMonitorInput +import org.opensearch.commons.alerting.model.DocLevelQuery +import org.opensearch.commons.alerting.model.Monitor +import org.opensearch.rest.RestRequest +import java.util.Collections + +class WorkflowMonitorIT : WorkflowSingleNodeTestCase() { + + fun `test create workflow success`() { + val docQuery1 = DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3") + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(docQuery1) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val customFindingsIndex = "custom_findings_index" + val customFindingsIndexPattern = "custom_findings_index-1" + val customQueryIndex = "custom_alerts_index" + val monitor1 = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + dataSources = DataSources( + queryIndex = customQueryIndex, + findingsIndex = customFindingsIndex, + findingsIndexPattern = customFindingsIndexPattern + ) + ) + + val monitor2 = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + dataSources = DataSources( + queryIndex = customQueryIndex, + findingsIndex = customFindingsIndex, + findingsIndexPattern = customFindingsIndexPattern + ) + ) + + val monitorResponse1 = createMonitor(monitor1)!! + val monitorResponse2 = createMonitor(monitor2)!! + + val workflow = randomWorkflowMonitor( + monitorIds = listOf(monitorResponse1.id, monitorResponse2.id) + ) + + val workflowResponse = upsertWorkflow(workflow)!! + assertNotNull("Workflow creation failed", workflowResponse) + assertNotNull(workflowResponse.workflow) + assertNotEquals("response is missing Id", Monitor.NO_ID, workflowResponse.id) + assertTrue("incorrect version", workflowResponse.version > 0) + + val workflowById = searchWorkflow(workflowResponse.id)!! + assertNotNull(workflowById) + + // Verify workflow + assertNotEquals("response is missing Id", Monitor.NO_ID, workflowById.id) + assertTrue("incorrect version", workflowById.version > 0) + assertEquals("Workflow name not correct", workflow.name, workflowById.name) + assertEquals("Workflow owner not correct", workflow.owner, workflowById.owner) + assertEquals("Workflow input not correct", workflow.inputs, workflowById.inputs) + + // Delegate verification + val delegates = (workflowById.inputs as List)[0].sequence.delegates.sortedBy { it.order } + assertEquals("Delegates size not correct", 2, delegates.size) + + val delegate1 = delegates[0] + assertNotNull(delegate1) + assertEquals("Delegate1 order not correct", 1, delegate1.order) + assertEquals("Delegate1 id not correct", monitorResponse1.id, delegate1.monitorId) + + val delegate2 = delegates[1] + assertNotNull(delegate2) + assertEquals("Delegate2 order not correct", 2, delegate2.order) + assertEquals("Delegate2 id not correct", monitorResponse2.id, delegate2.monitorId) + assertEquals( + "Delegate2 Chained finding not correct", monitorResponse1.id, delegate2.chainedFindings!!.monitorId + ) + } + + fun `test update workflow success`() { + val docQuery1 = DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3") + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(docQuery1) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val customFindingsIndex = "custom_findings_index" + val customFindingsIndexPattern = "custom_findings_index-1" + val customQueryIndex = "custom_alerts_index" + val monitor1 = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + dataSources = DataSources( + queryIndex = customQueryIndex, + findingsIndex = customFindingsIndex, + findingsIndexPattern = customFindingsIndexPattern + ) + ) + + val monitor2 = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + dataSources = DataSources( + queryIndex = customQueryIndex, + findingsIndex = customFindingsIndex, + findingsIndexPattern = customFindingsIndexPattern + ) + ) + + val monitorResponse1 = createMonitor(monitor1)!! + val monitorResponse2 = createMonitor(monitor2)!! + + val workflow = randomWorkflowMonitor( + monitorIds = listOf(monitorResponse1.id, monitorResponse2.id) + ) + + val workflowResponse = upsertWorkflow(workflow)!! + assertNotNull("Workflow creation failed", workflowResponse) + assertNotNull(workflowResponse.workflow) + assertNotEquals("response is missing Id", Monitor.NO_ID, workflowResponse.id) + assertTrue("incorrect version", workflowResponse.version > 0) + + var workflowById = searchWorkflow(workflowResponse.id)!! + assertNotNull(workflowById) + + val monitor3 = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + dataSources = DataSources( + queryIndex = customQueryIndex, + findingsIndex = customFindingsIndex, + findingsIndexPattern = customFindingsIndexPattern + ) + ) + val monitorResponse3 = createMonitor(monitor3)!! + + val updatedWorkflowResponse = upsertWorkflow( + randomWorkflowMonitor( + monitorIds = listOf(monitorResponse1.id, monitorResponse2.id, monitorResponse3.id) + ), + workflowResponse.id, + RestRequest.Method.PUT + )!! + + assertNotNull("Workflow creation failed", updatedWorkflowResponse) + assertNotNull(updatedWorkflowResponse.workflow) + assertEquals("Workflow id changed", workflowResponse.id, updatedWorkflowResponse.id) + assertTrue("incorrect version", updatedWorkflowResponse.version > 0) + + workflowById = searchWorkflow(updatedWorkflowResponse.id)!! + + // Verify workflow + assertNotEquals("response is missing Id", Monitor.NO_ID, workflowById.id) + assertTrue("incorrect version", workflowById.version > 0) + assertEquals("Workflow name not correct", updatedWorkflowResponse.workflow.name, workflowById.name) + assertEquals("Workflow owner not correct", updatedWorkflowResponse.workflow.owner, workflowById.owner) + assertEquals("Workflow input not correct", updatedWorkflowResponse.workflow.inputs, workflowById.inputs) + + // Delegate verification + val delegates = (workflowById.inputs as List)[0].sequence.delegates.sortedBy { it.order } + assertEquals("Delegates size not correct", 3, delegates.size) + + val delegate1 = delegates[0] + assertNotNull(delegate1) + assertEquals("Delegate1 order not correct", 1, delegate1.order) + assertEquals("Delegate1 id not correct", monitorResponse1.id, delegate1.monitorId) + + val delegate2 = delegates[1] + assertNotNull(delegate2) + assertEquals("Delegate2 order not correct", 2, delegate2.order) + assertEquals("Delegate2 id not correct", monitorResponse2.id, delegate2.monitorId) + assertEquals( + "Delegate2 Chained finding not correct", monitorResponse1.id, delegate2.chainedFindings!!.monitorId + ) + + val delegate3 = delegates[2] + assertNotNull(delegate3) + assertEquals("Delegate3 order not correct", 3, delegate3.order) + assertEquals("Delegate3 id not correct", monitorResponse3.id, delegate3.monitorId) + assertEquals( + "Delegate3 Chained finding not correct", monitorResponse2.id, delegate3.chainedFindings!!.monitorId + ) + } + + fun `test create workflow without delegate failure`() { + val workflow = randomWorkflowMonitor( + monitorIds = Collections.emptyList() + ) + try { + upsertWorkflow(workflow) + } catch (e: Exception) { + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("Delegates list can not be empty.") + ) + } + } + } + + fun `test create workflow duplicate delegate failure`() { + val workflow = randomWorkflowMonitor( + monitorIds = listOf("1", "1", "2") + ) + try { + upsertWorkflow(workflow) + } catch (e: Exception) { + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("Duplicate delegates not allowed") + ) + } + } + } + + fun `test create workflow delegate monitor doesn't exist failure`() { + val docQuery1 = DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3") + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(docQuery1) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val customFindingsIndex = "custom_findings_index" + val customFindingsIndexPattern = "custom_findings_index-1" + val customQueryIndex = "custom_alerts_index" + + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + dataSources = DataSources( + queryIndex = customQueryIndex, + findingsIndex = customFindingsIndex, + findingsIndexPattern = customFindingsIndexPattern + ) + ) + val monitorResponse = createMonitor(monitor)!! + + val workflow = randomWorkflowMonitor( + monitorIds = listOf("-1", monitorResponse.id) + ) + try { + upsertWorkflow(workflow) + } catch (e: Exception) { + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("are not valid monitor ids") + ) + } + } + } + + fun `test create workflow sequence order not correct failure`() { + val delegates = listOf( + Delegate(1, "monitor-1"), + Delegate(1, "monitor-2"), + Delegate(2, "monitor-3") + ) + val workflow = randomWorkflowMonitorWithDelegates( + delegates = delegates + ) + try { + upsertWorkflow(workflow) + } catch (e: Exception) { + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("Sequence ordering of delegate monitor shouldn't contain duplicate order values") + ) + } + } + } + + fun `test create workflow chained findings monitor not in sequence failure`() { + val delegates = listOf( + Delegate(1, "monitor-1"), + Delegate(2, "monitor-2", ChainedFindings("monitor-1")), + Delegate(3, "monitor-3", ChainedFindings("monitor-x")) + ) + val workflow = randomWorkflowMonitorWithDelegates( + delegates = delegates + ) + + try { + upsertWorkflow(workflow) + } catch (e: Exception) { + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("Chained Findings Monitor monitor-x doesn't exist in sequence") + ) + } + } + } + + fun `test create workflow chained findings order not correct failure`() { + val delegates = listOf( + Delegate(1, "monitor-1"), + Delegate(3, "monitor-2", ChainedFindings("monitor-1")), + Delegate(2, "monitor-3", ChainedFindings("monitor-2")) + ) + val workflow = randomWorkflowMonitorWithDelegates( + delegates = delegates + ) + + try { + upsertWorkflow(workflow) + } catch (e: Exception) { + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("Chained Findings Monitor monitor-2 should be executed before monitor monitor-3") + ) + } + } + } +} diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/transport/WorkflowSingleNodeTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/transport/WorkflowSingleNodeTestCase.kt new file mode 100644 index 000000000..3c1eeee79 --- /dev/null +++ b/alerting/src/test/kotlin/org/opensearch/alerting/transport/WorkflowSingleNodeTestCase.kt @@ -0,0 +1,58 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.transport + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope +import org.opensearch.action.support.WriteRequest +import org.opensearch.common.xcontent.json.JsonXContent +import org.opensearch.commons.alerting.action.AlertingActions +import org.opensearch.commons.alerting.action.IndexWorkflowRequest +import org.opensearch.commons.alerting.action.IndexWorkflowResponse +import org.opensearch.commons.alerting.model.ScheduledJob +import org.opensearch.commons.alerting.model.Workflow +import org.opensearch.index.query.TermQueryBuilder +import org.opensearch.index.seqno.SequenceNumbers +import org.opensearch.rest.RestRequest +import org.opensearch.search.builder.SearchSourceBuilder +/** + * A test that keep a singleton node started for all tests that can be used to get + * references to Guice injectors in unit tests. + */ + +@ThreadLeakScope(ThreadLeakScope.Scope.NONE) +abstract class WorkflowSingleNodeTestCase : AlertingSingleNodeTestCase() { + + protected fun searchWorkflow(id: String, indices: String = ScheduledJob.SCHEDULED_JOBS_INDEX, refresh: Boolean = true): Workflow? { + try { + if (refresh) refreshIndex(indices) + } catch (e: Exception) { + logger.warn("Could not refresh index $indices because: ${e.message}") + return null + } + val ssb = SearchSourceBuilder() + ssb.version(true) + ssb.query(TermQueryBuilder("_id", id)) + val searchResponse = client().prepareSearch(indices).setRouting(id).setSource(ssb).get() + + return searchResponse.hits.hits.map { it -> + val xcp = createParser(JsonXContent.jsonXContent, it.sourceRef).also { it.nextToken() } + Workflow.parse(xcp, it.id, it.version) + }.first() + } + + protected fun upsertWorkflow(workflow: Workflow, id: String = Workflow.NO_ID, method: RestRequest.Method = RestRequest.Method.POST): IndexWorkflowResponse? { + val request = IndexWorkflowRequest( + workflowId = id, + seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO, + primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM, + refreshPolicy = WriteRequest.RefreshPolicy.parse("true"), + method = method, + workflow = workflow + ) + + return client().execute(AlertingActions.INDEX_WORKFLOW_ACTION_TYPE, request).actionGet() + } +} diff --git a/core/build.gradle b/core/build.gradle index ce258112b..f4432bb06 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -15,7 +15,7 @@ dependencies { implementation "com.cronutils:cron-utils:9.1.6" api "org.opensearch.client:opensearch-rest-client:${opensearch_version}" implementation 'com.google.googlejavaformat:google-java-format:1.10.0' - api files("/Users/snistala/Documents/opensearch/common-utils/build/libs/common-utils-3.0.0.0-SNAPSHOT.jar") + api files("/home/stevan/git/opensearch/repo/common-utils/build/libs/common-utils-2.5.0.0-SNAPSHOT.jar") implementation 'commons-validator:commons-validator:1.7' testImplementation "org.opensearch.test:framework:${opensearch_version}" testImplementation "org.jetbrains.kotlin:kotlin-test:${kotlin_version}" From e0af3059598db04a8c6a94eb473730e5b76d0e5d Mon Sep 17 00:00:00 2001 From: Stevan Buzejic Date: Tue, 31 Jan 2023 18:58:51 +0100 Subject: [PATCH 2/4] Added transport layer for getting and deleting the workflow Signed-off-by: Stevan Buzejic --- .../org/opensearch/alerting/AlertingPlugin.kt | 6 +- .../TransportDeleteWorkflowAction.kt | 145 ++++++++++++++ .../transport/TransportGetWorkflowAction.kt | 41 ++-- .../opensearch/alerting/WorkflowMonitorIT.kt | 183 +++++++++++++++++- .../transport/WorkflowSingleNodeTestCase.kt | 12 ++ .../resources/mappings/scheduled-jobs.json | 39 ++-- 6 files changed, 395 insertions(+), 31 deletions(-) create mode 100644 alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteWorkflowAction.kt diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index 61439ea70..c4151527e 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -44,6 +44,7 @@ import org.opensearch.alerting.settings.LegacyOpenDistroAlertingSettings import org.opensearch.alerting.settings.LegacyOpenDistroDestinationSettings import org.opensearch.alerting.transport.TransportAcknowledgeAlertAction import org.opensearch.alerting.transport.TransportDeleteMonitorAction +import org.opensearch.alerting.transport.TransportDeleteWorkflowAction import org.opensearch.alerting.transport.TransportExecuteMonitorAction import org.opensearch.alerting.transport.TransportGetAlertsAction import org.opensearch.alerting.transport.TransportGetDestinationsAction @@ -51,6 +52,7 @@ import org.opensearch.alerting.transport.TransportGetEmailAccountAction import org.opensearch.alerting.transport.TransportGetEmailGroupAction import org.opensearch.alerting.transport.TransportGetFindingsSearchAction import org.opensearch.alerting.transport.TransportGetMonitorAction +import org.opensearch.alerting.transport.TransportGetWorkflowAction import org.opensearch.alerting.transport.TransportIndexCompositeWorkflowAction import org.opensearch.alerting.transport.TransportIndexMonitorAction import org.opensearch.alerting.transport.TransportSearchEmailAccountAction @@ -183,7 +185,9 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R ActionPlugin.ActionHandler(GetDestinationsAction.INSTANCE, TransportGetDestinationsAction::class.java), ActionPlugin.ActionHandler(AlertingActions.GET_ALERTS_ACTION_TYPE, TransportGetAlertsAction::class.java), ActionPlugin.ActionHandler(AlertingActions.GET_FINDINGS_ACTION_TYPE, TransportGetFindingsSearchAction::class.java), - ActionPlugin.ActionHandler(AlertingActions.INDEX_WORKFLOW_ACTION_TYPE, TransportIndexCompositeWorkflowAction::class.java) + ActionPlugin.ActionHandler(AlertingActions.INDEX_WORKFLOW_ACTION_TYPE, TransportIndexCompositeWorkflowAction::class.java), + ActionPlugin.ActionHandler(AlertingActions.GET_WORKFLOW_ACTION_TYPE, TransportGetWorkflowAction::class.java), + ActionPlugin.ActionHandler(AlertingActions.DELETE_WORKFLOW_ACTION_TYPE, TransportDeleteWorkflowAction::class.java) ) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteWorkflowAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteWorkflowAction.kt new file mode 100644 index 000000000..e0c011c45 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteWorkflowAction.kt @@ -0,0 +1,145 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.transport + +import kotlinx.coroutines.CoroutineName +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.launch +import org.opensearch.OpenSearchStatusException +import org.opensearch.action.ActionListener +import org.opensearch.action.ActionRequest +import org.opensearch.action.delete.DeleteRequest +import org.opensearch.action.delete.DeleteResponse +import org.opensearch.action.get.GetRequest +import org.opensearch.action.get.GetResponse +import org.opensearch.action.support.ActionFilters +import org.opensearch.action.support.HandledTransportAction +import org.opensearch.alerting.opensearchapi.suspendUntil +import org.opensearch.alerting.settings.AlertingSettings +import org.opensearch.alerting.util.AlertingException +import org.opensearch.client.Client +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.inject.Inject +import org.opensearch.common.settings.Settings +import org.opensearch.common.xcontent.LoggingDeprecationHandler +import org.opensearch.common.xcontent.NamedXContentRegistry +import org.opensearch.common.xcontent.XContentHelper +import org.opensearch.common.xcontent.XContentType +import org.opensearch.commons.alerting.action.AlertingActions +import org.opensearch.commons.alerting.action.DeleteWorkflowRequest +import org.opensearch.commons.alerting.action.DeleteWorkflowResponse +import org.opensearch.commons.alerting.model.ScheduledJob +import org.opensearch.commons.alerting.model.Workflow +import org.opensearch.commons.authuser.User +import org.opensearch.commons.utils.recreateObject +import org.opensearch.rest.RestStatus +import org.opensearch.tasks.Task +import org.opensearch.transport.TransportService + +class TransportDeleteWorkflowAction @Inject constructor( + transportService: TransportService, + val client: Client, + actionFilters: ActionFilters, + val clusterService: ClusterService, + settings: Settings, + val xContentRegistry: NamedXContentRegistry +) : HandledTransportAction( + AlertingActions.DELETE_WORKFLOW_ACTION_NAME, transportService, actionFilters, ::DeleteWorkflowRequest +), + SecureTransportAction { + + @Volatile override var filterByEnabled = AlertingSettings.FILTER_BY_BACKEND_ROLES.get(settings) + + init { + listenFilterBySettingChange(clusterService) + } + + override fun doExecute(task: Task, request: ActionRequest, actionListener: ActionListener) { + val transformedRequest = request as? DeleteWorkflowRequest + ?: recreateObject(request) { DeleteWorkflowRequest(it) } + + val user = readUserFromThreadContext(client) + val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, transformedRequest.workflowId) + .setRefreshPolicy(transformedRequest.refreshPolicy) + + if (!validateUserBackendRoles(user, actionListener)) { + return + } + + GlobalScope.launch(Dispatchers.IO + CoroutineName("DeleteWorkflowAction")) { + DeleteWorkflowHandler(client, actionListener, deleteRequest, user, transformedRequest.workflowId).resolveUserAndStart() + } + } + + inner class DeleteWorkflowHandler( + private val client: Client, + private val actionListener: ActionListener, + private val deleteRequest: DeleteRequest, + private val user: User?, + private val workflowId: String + ) { + suspend fun resolveUserAndStart() { + try { + val workflow = getWorkflow() + + val canDelete = user == null || + !doFilterForUser(user) || + checkUserPermissionsWithResource( + user, + workflow.user, + actionListener, + "workflow", + workflowId + ) + + if (canDelete) { + val deleteResponse = deleteWorkflow(workflow) + // TODO - uncomment once the workflow metadata is added + // deleteMetadata(workflow) + actionListener.onResponse(DeleteWorkflowResponse(deleteResponse.id, deleteResponse.version)) + } else { + actionListener.onFailure( + AlertingException( + "Not allowed to delete this workflow!", + RestStatus.FORBIDDEN, + IllegalStateException() + ) + ) + } + } catch (t: Exception) { + actionListener.onFailure(AlertingException.wrap(t)) + } + } + + private suspend fun getWorkflow(): Workflow { + val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, workflowId) + + val getResponse: GetResponse = client.suspendUntil { get(getRequest, it) } + if (getResponse.isExists == false) { + actionListener.onFailure( + AlertingException.wrap( + OpenSearchStatusException("Workflow with $workflowId is not found", RestStatus.NOT_FOUND) + ) + ) + } + val xcp = XContentHelper.createParser( + xContentRegistry, LoggingDeprecationHandler.INSTANCE, + getResponse.sourceAsBytesRef, XContentType.JSON + ) + return ScheduledJob.parse(xcp, getResponse.id, getResponse.version) as Workflow + } + + private suspend fun deleteWorkflow(workflow: Workflow): DeleteResponse { + return client.suspendUntil { delete(deleteRequest, it) } + } + + private suspend fun deleteMetadata(workflow: Workflow) { + val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, "${workflow.id}-metadata") + val deleteResponse: DeleteResponse = client.suspendUntil { delete(deleteRequest, it) } + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetWorkflowAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetWorkflowAction.kt index 80c61ad85..fcd3ffd52 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetWorkflowAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetWorkflowAction.kt @@ -1,3 +1,8 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + package org.opensearch.alerting.transport import org.opensearch.OpenSearchStatusException @@ -6,9 +11,6 @@ import org.opensearch.action.get.GetRequest import org.opensearch.action.get.GetResponse import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction -import org.opensearch.alerting.action.GetMonitorAction -import org.opensearch.alerting.action.GetMonitorRequest -import org.opensearch.alerting.action.GetMonitorResponse import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.util.AlertingException import org.opensearch.client.Client @@ -19,8 +21,11 @@ import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.common.xcontent.NamedXContentRegistry import org.opensearch.common.xcontent.XContentHelper import org.opensearch.common.xcontent.XContentType -import org.opensearch.commons.alerting.model.Monitor +import org.opensearch.commons.alerting.action.AlertingActions +import org.opensearch.commons.alerting.action.GetWorkflowRequest +import org.opensearch.commons.alerting.action.GetWorkflowResponse import org.opensearch.commons.alerting.model.ScheduledJob +import org.opensearch.commons.alerting.model.Workflow import org.opensearch.rest.RestStatus import org.opensearch.tasks.Task import org.opensearch.transport.TransportService @@ -32,8 +37,8 @@ class TransportGetWorkflowAction @Inject constructor( val xContentRegistry: NamedXContentRegistry, val clusterService: ClusterService, settings: Settings -) : HandledTransportAction( - GetMonitorAction.NAME, transportService, actionFilters, ::GetMonitorRequest +) : HandledTransportAction( + AlertingActions.GET_WORKFLOW_ACTION_NAME, transportService, actionFilters, ::GetWorkflowRequest ), SecureTransportAction { @@ -43,12 +48,12 @@ class TransportGetWorkflowAction @Inject constructor( listenFilterBySettingChange(clusterService) } - override fun doExecute(task: Task, getMonitorRequest: GetMonitorRequest, actionListener: ActionListener) { + override fun doExecute(task: Task, getWorkflowRequest: GetWorkflowRequest, actionListener: ActionListener) { val user = readUserFromThreadContext(client) - val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, getMonitorRequest.monitorId) - .version(getMonitorRequest.version) - .fetchSourceContext(getMonitorRequest.srcContext) + val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, getWorkflowRequest.workflowId) + .version(getWorkflowRequest.version) + .fetchSourceContext(getWorkflowRequest.srcContext) if (!validateUserBackendRoles(user, actionListener)) { return @@ -69,7 +74,7 @@ class TransportGetWorkflowAction @Inject constructor( actionListener.onFailure( AlertingException.wrap( OpenSearchStatusException( - "Monitor not found.", + "Workflow not found.", RestStatus.NOT_FOUND ) ) @@ -77,21 +82,21 @@ class TransportGetWorkflowAction @Inject constructor( return } - var monitor: Monitor? = null + var workflow: Workflow? = null if (!response.isSourceEmpty) { XContentHelper.createParser( xContentRegistry, LoggingDeprecationHandler.INSTANCE, response.sourceAsBytesRef, XContentType.JSON ).use { xcp -> - monitor = ScheduledJob.parse(xcp, response.id, response.version) as Monitor + workflow = ScheduledJob.parse(xcp, response.id, response.version) as Workflow // security is enabled and filterby is enabled if (!checkUserPermissionsWithResource( user, - monitor?.user, + workflow?.user, actionListener, - "monitor", - getMonitorRequest.monitorId + "workflow", + getWorkflowRequest.workflowId ) ) { return @@ -100,13 +105,13 @@ class TransportGetWorkflowAction @Inject constructor( } actionListener.onResponse( - GetMonitorResponse( + GetWorkflowResponse( response.id, response.version, response.seqNo, response.primaryTerm, RestStatus.OK, - monitor + workflow ) ) } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowMonitorIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowMonitorIT.kt index 239eba265..1745e7155 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowMonitorIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowMonitorIT.kt @@ -5,7 +5,11 @@ package org.opensearch.alerting +import org.opensearch.action.admin.indices.refresh.RefreshRequest +import org.opensearch.action.support.WriteRequest import org.opensearch.alerting.transport.WorkflowSingleNodeTestCase +import org.opensearch.commons.alerting.action.AlertingActions +import org.opensearch.commons.alerting.action.DeleteWorkflowRequest import org.opensearch.commons.alerting.model.ChainedFindings import org.opensearch.commons.alerting.model.CompositeInput import org.opensearch.commons.alerting.model.DataSources @@ -88,7 +92,7 @@ class WorkflowMonitorIT : WorkflowSingleNodeTestCase() { ) } - fun `test update workflow success`() { + fun `test update workflow add monitor success`() { val docQuery1 = DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3") val docLevelInput = DocLevelMonitorInput( "description", listOf(index), listOf(docQuery1) @@ -192,6 +196,183 @@ class WorkflowMonitorIT : WorkflowSingleNodeTestCase() { ) } + fun `test update workflow remove monitor success`() { + val docQuery1 = DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3") + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(docQuery1) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val customFindingsIndex = "custom_findings_index" + val customFindingsIndexPattern = "custom_findings_index-1" + val customQueryIndex = "custom_alerts_index" + val monitor1 = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + dataSources = DataSources( + queryIndex = customQueryIndex, + findingsIndex = customFindingsIndex, + findingsIndexPattern = customFindingsIndexPattern + ) + ) + + val monitor2 = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + dataSources = DataSources( + queryIndex = customQueryIndex, + findingsIndex = customFindingsIndex, + findingsIndexPattern = customFindingsIndexPattern + ) + ) + + val monitorResponse1 = createMonitor(monitor1)!! + val monitorResponse2 = createMonitor(monitor2)!! + + val workflow = randomWorkflowMonitor( + monitorIds = listOf(monitorResponse1.id, monitorResponse2.id) + ) + + val workflowResponse = upsertWorkflow(workflow)!! + assertNotNull("Workflow creation failed", workflowResponse) + assertNotNull(workflowResponse.workflow) + assertNotEquals("response is missing Id", Monitor.NO_ID, workflowResponse.id) + assertTrue("incorrect version", workflowResponse.version > 0) + + var workflowById = searchWorkflow(workflowResponse.id)!! + assertNotNull(workflowById) + + val updatedWorkflowResponse = upsertWorkflow( + randomWorkflowMonitor( + monitorIds = listOf(monitorResponse1.id) + ), + workflowResponse.id, + RestRequest.Method.PUT + )!! + + assertNotNull("Workflow creation failed", updatedWorkflowResponse) + assertNotNull(updatedWorkflowResponse.workflow) + assertEquals("Workflow id changed", workflowResponse.id, updatedWorkflowResponse.id) + assertTrue("incorrect version", updatedWorkflowResponse.version > 0) + + workflowById = searchWorkflow(updatedWorkflowResponse.id)!! + + // Verify workflow + assertNotEquals("response is missing Id", Monitor.NO_ID, workflowById.id) + assertTrue("incorrect version", workflowById.version > 0) + assertEquals("Workflow name not correct", updatedWorkflowResponse.workflow.name, workflowById.name) + assertEquals("Workflow owner not correct", updatedWorkflowResponse.workflow.owner, workflowById.owner) + assertEquals("Workflow input not correct", updatedWorkflowResponse.workflow.inputs, workflowById.inputs) + + // Delegate verification + val delegates = (workflowById.inputs as List)[0].sequence.delegates.sortedBy { it.order } + assertEquals("Delegates size not correct", 1, delegates.size) + + val delegate1 = delegates[0] + assertNotNull(delegate1) + assertEquals("Delegate1 order not correct", 1, delegate1.order) + assertEquals("Delegate1 id not correct", monitorResponse1.id, delegate1.monitorId) + } + + fun `test get workflow`() { + val docQuery1 = DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3") + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(docQuery1) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val customFindingsIndex = "custom_findings_index" + val customFindingsIndexPattern = "custom_findings_index-1" + val customQueryIndex = "custom_alerts_index" + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + dataSources = DataSources( + queryIndex = customQueryIndex, + findingsIndex = customFindingsIndex, + findingsIndexPattern = customFindingsIndexPattern + ) + ) + + val monitorResponse = createMonitor(monitor)!! + + val workflowRequest = randomWorkflowMonitor( + monitorIds = listOf(monitorResponse.id) + ) + + val workflowResponse = upsertWorkflow(workflowRequest)!! + assertNotNull("Workflow creation failed", workflowResponse) + assertNotNull(workflowResponse.workflow) + assertNotEquals("response is missing Id", Monitor.NO_ID, workflowResponse.id) + assertTrue("incorrect version", workflowResponse.version > 0) + + val getWorkflowResponse = getWorkflowById(id = workflowResponse.id) + assertNotNull(getWorkflowResponse) + + val workflowById = getWorkflowResponse.workflow!! + // Verify workflow + assertNotEquals("response is missing Id", Monitor.NO_ID, getWorkflowResponse.id) + assertTrue("incorrect version", getWorkflowResponse.version > 0) + assertEquals("Workflow name not correct", workflowRequest.name, workflowById.name) + assertEquals("Workflow owner not correct", workflowRequest.owner, workflowById.owner) + assertEquals("Workflow input not correct", workflowRequest.inputs, workflowById.inputs) + + // Delegate verification + val delegates = (workflowById.inputs as List)[0].sequence.delegates.sortedBy { it.order } + assertEquals("Delegates size not correct", 1, delegates.size) + + val delegate = delegates[0] + assertNotNull(delegate) + assertEquals("Delegate order not correct", 1, delegate.order) + assertEquals("Delegate id not correct", monitorResponse.id, delegate.monitorId) + } + + fun `test delete workflow`() { + val docQuery1 = DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3") + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(docQuery1) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val customFindingsIndex = "custom_findings_index" + val customFindingsIndexPattern = "custom_findings_index-1" + val customQueryIndex = "custom_alerts_index" + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + dataSources = DataSources( + queryIndex = customQueryIndex, + findingsIndex = customFindingsIndex, + findingsIndexPattern = customFindingsIndexPattern + ) + ) + + val monitorResponse = createMonitor(monitor)!! + + val workflowRequest = randomWorkflowMonitor( + monitorIds = listOf(monitorResponse.id) + ) + val workflowResponse = upsertWorkflow(workflowRequest)!! + val workflowId = workflowResponse.id + val getWorkflowResponse = getWorkflowById(id = workflowResponse.id) + + assertNotNull(getWorkflowResponse) + assertEquals(workflowId, getWorkflowResponse.id) + + client().execute( + AlertingActions.DELETE_WORKFLOW_ACTION_TYPE, DeleteWorkflowRequest(workflowId, WriteRequest.RefreshPolicy.IMMEDIATE) + ).get() + client().admin().indices().refresh(RefreshRequest(customQueryIndex)).get() + // Verify that the workflow is deleted + try { + getWorkflowById(workflowId) + } catch (e: Exception) { + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("Workflow not found.") + ) + } + } + } + fun `test create workflow without delegate failure`() { val workflow = randomWorkflowMonitor( monitorIds = Collections.emptyList() diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/transport/WorkflowSingleNodeTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/transport/WorkflowSingleNodeTestCase.kt index 3c1eeee79..1ced2e2ae 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/transport/WorkflowSingleNodeTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/transport/WorkflowSingleNodeTestCase.kt @@ -9,6 +9,8 @@ import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope import org.opensearch.action.support.WriteRequest import org.opensearch.common.xcontent.json.JsonXContent import org.opensearch.commons.alerting.action.AlertingActions +import org.opensearch.commons.alerting.action.GetWorkflowRequest +import org.opensearch.commons.alerting.action.GetWorkflowResponse import org.opensearch.commons.alerting.action.IndexWorkflowRequest import org.opensearch.commons.alerting.action.IndexWorkflowResponse import org.opensearch.commons.alerting.model.ScheduledJob @@ -17,6 +19,8 @@ import org.opensearch.index.query.TermQueryBuilder import org.opensearch.index.seqno.SequenceNumbers import org.opensearch.rest.RestRequest import org.opensearch.search.builder.SearchSourceBuilder +import org.opensearch.search.fetch.subphase.FetchSourceContext + /** * A test that keep a singleton node started for all tests that can be used to get * references to Guice injectors in unit tests. @@ -55,4 +59,12 @@ abstract class WorkflowSingleNodeTestCase : AlertingSingleNodeTestCase() { return client().execute(AlertingActions.INDEX_WORKFLOW_ACTION_TYPE, request).actionGet() } + + protected fun getWorkflowById( + id: String, + version: Long = 1L, + fetchSourceContext: FetchSourceContext = FetchSourceContext.FETCH_SOURCE + ): GetWorkflowResponse { + return client().execute(AlertingActions.GET_WORKFLOW_ACTION_TYPE, GetWorkflowRequest(id, version, RestRequest.Method.GET, fetchSourceContext)).get() + } } diff --git a/core/src/main/resources/mappings/scheduled-jobs.json b/core/src/main/resources/mappings/scheduled-jobs.json index 768f73a9a..29f499ba1 100644 --- a/core/src/main/resources/mappings/scheduled-jobs.json +++ b/core/src/main/resources/mappings/scheduled-jobs.json @@ -404,20 +404,37 @@ "inputs": { "type": "nested", "properties": { - "search": { + "sequence": { "properties": { - "indices": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256 + "delegates": { + "type": "nested", + "properties": { + "order": { + "type": "integer" + }, + "monitorId": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "chainedFindings": { + "properties": { + "monitorId": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } } } - }, - "query": { - "type": "object", - "enabled": false } } } From feebf0e3f769eba0b5e849813b10caffc95533cc Mon Sep 17 00:00:00 2001 From: Stevan Buzejic Date: Thu, 23 Feb 2023 22:25:59 +0100 Subject: [PATCH 3/4] Updated getting and deleting the workflow in order to check if the monitor index is not initialized yet. Added workflow crud test cases Signed-off-by: Stevan Buzejic --- .../TransportDeleteWorkflowAction.kt | 21 +- .../transport/TransportGetWorkflowAction.kt | 12 +- .../TransportIndexCompositeWorkflowAction.kt | 6 + .../org/opensearch/alerting/TestHelpers.kt | 4 + .../opensearch/alerting/WorkflowMonitorIT.kt | 394 ++++++++++++++++-- .../transport/WorkflowSingleNodeTestCase.kt | 8 + 6 files changed, 400 insertions(+), 45 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteWorkflowAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteWorkflowAction.kt index e0c011c45..f50042052 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteWorkflowAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteWorkflowAction.kt @@ -9,6 +9,7 @@ import kotlinx.coroutines.CoroutineName import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.launch +import org.apache.logging.log4j.LogManager import org.opensearch.OpenSearchStatusException import org.opensearch.action.ActionListener import org.opensearch.action.ActionRequest @@ -18,6 +19,7 @@ import org.opensearch.action.get.GetRequest import org.opensearch.action.get.GetResponse import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction +import org.opensearch.action.support.WriteRequest import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.util.AlertingException @@ -36,10 +38,13 @@ import org.opensearch.commons.alerting.model.ScheduledJob import org.opensearch.commons.alerting.model.Workflow import org.opensearch.commons.authuser.User import org.opensearch.commons.utils.recreateObject +import org.opensearch.index.IndexNotFoundException import org.opensearch.rest.RestStatus import org.opensearch.tasks.Task import org.opensearch.transport.TransportService +private val log = LogManager.getLogger(TransportIndexMonitorAction::class.java) + class TransportDeleteWorkflowAction @Inject constructor( transportService: TransportService, val client: Client, @@ -64,7 +69,7 @@ class TransportDeleteWorkflowAction @Inject constructor( val user = readUserFromThreadContext(client) val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, transformedRequest.workflowId) - .setRefreshPolicy(transformedRequest.refreshPolicy) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) if (!validateUserBackendRoles(user, actionListener)) { return @@ -111,7 +116,16 @@ class TransportDeleteWorkflowAction @Inject constructor( ) } } catch (t: Exception) { - actionListener.onFailure(AlertingException.wrap(t)) + if (t is IndexNotFoundException) { + actionListener.onFailure( + OpenSearchStatusException( + "Workflow not found.", + RestStatus.NOT_FOUND + ) + ) + } else { + actionListener.onFailure(AlertingException.wrap(t)) + } } } @@ -122,7 +136,7 @@ class TransportDeleteWorkflowAction @Inject constructor( if (getResponse.isExists == false) { actionListener.onFailure( AlertingException.wrap( - OpenSearchStatusException("Workflow with $workflowId is not found", RestStatus.NOT_FOUND) + OpenSearchStatusException("Workflow not found.", RestStatus.NOT_FOUND) ) ) } @@ -134,6 +148,7 @@ class TransportDeleteWorkflowAction @Inject constructor( } private suspend fun deleteWorkflow(workflow: Workflow): DeleteResponse { + log.debug("Deleting the workflow with id ${deleteRequest.id()}") return client.suspendUntil { delete(deleteRequest, it) } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetWorkflowAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetWorkflowAction.kt index fcd3ffd52..f0802da4d 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetWorkflowAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetWorkflowAction.kt @@ -26,6 +26,7 @@ import org.opensearch.commons.alerting.action.GetWorkflowRequest import org.opensearch.commons.alerting.action.GetWorkflowResponse import org.opensearch.commons.alerting.model.ScheduledJob import org.opensearch.commons.alerting.model.Workflow +import org.opensearch.index.IndexNotFoundException import org.opensearch.rest.RestStatus import org.opensearch.tasks.Task import org.opensearch.transport.TransportService @@ -117,7 +118,16 @@ class TransportGetWorkflowAction @Inject constructor( } override fun onFailure(t: Exception) { - actionListener.onFailure(AlertingException.wrap(t)) + if (t is IndexNotFoundException) { + actionListener.onFailure( + OpenSearchStatusException( + "Workflow not found", + RestStatus.NOT_FOUND + ) + ) + } else { + actionListener.onFailure(AlertingException.wrap(t)) + } } } ) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexCompositeWorkflowAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexCompositeWorkflowAction.kt index 1ea721b39..7ce4f48ee 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexCompositeWorkflowAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexCompositeWorkflowAction.kt @@ -463,6 +463,12 @@ class TransportIndexCompositeWorkflowAction @Inject constructor( } suspend fun validateRequest(request: IndexWorkflowRequest) { + if (request.workflow.inputs.isEmpty()) + throw AlertingException.wrap(IllegalArgumentException("Input list can not be empty.")) + + if (request.workflow.inputs[0] !is CompositeInput) + throw AlertingException.wrap(IllegalArgumentException("When creating a workflow input must be CompositeInput")) + val compositeInput = request.workflow.inputs[0] as CompositeInput val monitorIds = compositeInput.sequence.delegates.stream().map { it.monitorId }.collect(Collectors.toList()) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt index 4e828b12c..f858d7831 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt @@ -224,6 +224,7 @@ fun randomDocumentLevelMonitor( } fun randomWorkflowMonitor( + id: String = Workflow.NO_ID, monitorIds: List, name: String = OpenSearchRestTestCase.randomAlphaOfLength(10), user: User? = randomUser(), @@ -241,6 +242,7 @@ fun randomWorkflowMonitor( } return Workflow( + id = id, name = name, enabled = enabled, schedule = schedule, @@ -253,6 +255,7 @@ fun randomWorkflowMonitor( } fun randomWorkflowMonitorWithDelegates( + id: String = Workflow.NO_ID, delegates: List, name: String = OpenSearchRestTestCase.randomAlphaOfLength(10), user: User? = randomUser(), @@ -262,6 +265,7 @@ fun randomWorkflowMonitorWithDelegates( lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), ): Workflow { return Workflow( + id = id, name = name, enabled = enabled, schedule = schedule, diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowMonitorIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowMonitorIT.kt index 1745e7155..d5def95dc 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowMonitorIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowMonitorIT.kt @@ -5,11 +5,7 @@ package org.opensearch.alerting -import org.opensearch.action.admin.indices.refresh.RefreshRequest -import org.opensearch.action.support.WriteRequest import org.opensearch.alerting.transport.WorkflowSingleNodeTestCase -import org.opensearch.commons.alerting.action.AlertingActions -import org.opensearch.commons.alerting.action.DeleteWorkflowRequest import org.opensearch.commons.alerting.model.ChainedFindings import org.opensearch.commons.alerting.model.CompositeInput import org.opensearch.commons.alerting.model.DataSources @@ -75,6 +71,7 @@ class WorkflowMonitorIT : WorkflowSingleNodeTestCase() { assertEquals("Workflow input not correct", workflow.inputs, workflowById.inputs) // Delegate verification + @Suppress("UNCHECKED_CAST") val delegates = (workflowById.inputs as List)[0].sequence.delegates.sortedBy { it.order } assertEquals("Delegates size not correct", 2, delegates.size) @@ -171,6 +168,7 @@ class WorkflowMonitorIT : WorkflowSingleNodeTestCase() { assertEquals("Workflow input not correct", updatedWorkflowResponse.workflow.inputs, workflowById.inputs) // Delegate verification + @Suppress("UNCHECKED_CAST") val delegates = (workflowById.inputs as List)[0].sequence.delegates.sortedBy { it.order } assertEquals("Delegates size not correct", 3, delegates.size) @@ -264,6 +262,7 @@ class WorkflowMonitorIT : WorkflowSingleNodeTestCase() { assertEquals("Workflow input not correct", updatedWorkflowResponse.workflow.inputs, workflowById.inputs) // Delegate verification + @Suppress("UNCHECKED_CAST") val delegates = (workflowById.inputs as List)[0].sequence.delegates.sortedBy { it.order } assertEquals("Delegates size not correct", 1, delegates.size) @@ -274,22 +273,13 @@ class WorkflowMonitorIT : WorkflowSingleNodeTestCase() { } fun `test get workflow`() { - val docQuery1 = DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3") val docLevelInput = DocLevelMonitorInput( - "description", listOf(index), listOf(docQuery1) + "description", listOf(index), listOf(DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")) ) val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) - val customFindingsIndex = "custom_findings_index" - val customFindingsIndexPattern = "custom_findings_index-1" - val customQueryIndex = "custom_alerts_index" val monitor = randomDocumentLevelMonitor( inputs = listOf(docLevelInput), triggers = listOf(trigger), - dataSources = DataSources( - queryIndex = customQueryIndex, - findingsIndex = customFindingsIndex, - findingsIndexPattern = customFindingsIndexPattern - ) ) val monitorResponse = createMonitor(monitor)!! @@ -316,6 +306,7 @@ class WorkflowMonitorIT : WorkflowSingleNodeTestCase() { assertEquals("Workflow input not correct", workflowRequest.inputs, workflowById.inputs) // Delegate verification + @Suppress("UNCHECKED_CAST") val delegates = (workflowById.inputs as List)[0].sequence.delegates.sortedBy { it.order } assertEquals("Delegates size not correct", 1, delegates.size) @@ -325,23 +316,52 @@ class WorkflowMonitorIT : WorkflowSingleNodeTestCase() { assertEquals("Delegate id not correct", monitorResponse.id, delegate.monitorId) } - fun `test delete workflow`() { - val docQuery1 = DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3") + fun `test get workflow for invalid id monitor index doesn't exist`() { + // Get workflow for non existing workflow id + try { + getWorkflowById(id = "-1") + } catch (e: Exception) { + e.message?.let { + assertTrue( + "Exception not returning GetWorkflow Action error ", + it.contains("Workflow not found") + ) + } + } + } + + fun `test get workflow for invalid id monitor index exists`() { val docLevelInput = DocLevelMonitorInput( - "description", listOf(index), listOf(docQuery1) + "description", listOf(index), listOf(DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")) ) val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) - val customFindingsIndex = "custom_findings_index" - val customFindingsIndexPattern = "custom_findings_index-1" - val customQueryIndex = "custom_alerts_index" val monitor = randomDocumentLevelMonitor( inputs = listOf(docLevelInput), triggers = listOf(trigger), - dataSources = DataSources( - queryIndex = customQueryIndex, - findingsIndex = customFindingsIndex, - findingsIndexPattern = customFindingsIndexPattern - ) + ) + createMonitor(monitor) + // Get workflow for non existing workflow id + try { + getWorkflowById(id = "-1") + } catch (e: Exception) { + e.message?.let { + assertTrue( + "Exception not returning GetWorkflow Action error ", + it.contains("Workflow not found") + ) + } + } + } + + fun `test delete workflow`() { + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) ) val monitorResponse = createMonitor(monitor)!! @@ -356,17 +376,89 @@ class WorkflowMonitorIT : WorkflowSingleNodeTestCase() { assertNotNull(getWorkflowResponse) assertEquals(workflowId, getWorkflowResponse.id) - client().execute( - AlertingActions.DELETE_WORKFLOW_ACTION_TYPE, DeleteWorkflowRequest(workflowId, WriteRequest.RefreshPolicy.IMMEDIATE) - ).get() - client().admin().indices().refresh(RefreshRequest(customQueryIndex)).get() + deleteWorkflow(workflowId) // Verify that the workflow is deleted try { getWorkflowById(workflowId) } catch (e: Exception) { e.message?.let { assertTrue( - "Exception not returning IndexWorkflow Action error ", + "Exception not returning GetWorkflow Action error ", + it.contains("Workflow not found.") + ) + } + } + } + + fun `test delete monitor that is part of workflow sequence`() { + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + + val monitorResponse = createMonitor(monitor)!! + + val workflowRequest = randomWorkflowMonitor( + monitorIds = listOf(monitorResponse.id) + ) + val workflowResponse = upsertWorkflow(workflowRequest)!! + val workflowId = workflowResponse.id + val getWorkflowResponse = getWorkflowById(id = workflowResponse.id) + + assertNotNull(getWorkflowResponse) + assertEquals(workflowId, getWorkflowResponse.id) + + deleteWorkflow(workflowId) + // Verify that the workflow is deleted + try { + getWorkflowById(workflowId) + } catch (e: Exception) { + e.message?.let { + assertTrue( + "Exception not returning GetWorkflow Action error ", + it.contains("Workflow not found.") + ) + } + } + } + + fun `test delete workflow for invalid id monitor index doesn't exists`() { + // Try deleting non-existing workflow + try { + deleteWorkflow("-1") + } catch (e: Exception) { + e.message?.let { + assertTrue( + "Exception not returning DeleteWorkflow Action error ", + it.contains("Workflow not found.") + ) + } + } + } + + fun `test delete workflow for invalid id monitor index exists`() { + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + ) + createMonitor(monitor) + // Try deleting non-existing workflow + try { + deleteWorkflow("-1") + } catch (e: Exception) { + e.message?.let { + assertTrue( + "Exception not returning DeleteWorkflow Action error ", it.contains("Workflow not found.") ) } @@ -389,6 +481,47 @@ class WorkflowMonitorIT : WorkflowSingleNodeTestCase() { } } + fun `test update workflow without delegate failure`() { + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor1 = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + + val monitor2 = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + ) + + val monitorResponse1 = createMonitor(monitor1)!! + val monitorResponse2 = createMonitor(monitor2)!! + + var workflow = randomWorkflowMonitor( + monitorIds = listOf(monitorResponse1.id, monitorResponse2.id) + ) + + val workflowResponse = upsertWorkflow(workflow)!! + assertNotNull("Workflow creation failed", workflowResponse) + + workflow = randomWorkflowMonitor( + id = workflowResponse.id, + monitorIds = Collections.emptyList() + ) + try { + upsertWorkflow(workflow) + } catch (e: Exception) { + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("Delegates list can not be empty.") + ) + } + } + } + fun `test create workflow duplicate delegate failure`() { val workflow = randomWorkflowMonitor( monitorIds = listOf("1", "1", "2") @@ -405,24 +538,50 @@ class WorkflowMonitorIT : WorkflowSingleNodeTestCase() { } } + fun `test update workflow duplicate delegate failure`() { + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + + val monitorResponse = createMonitor(monitor)!! + + var workflow = randomWorkflowMonitor( + monitorIds = listOf(monitorResponse.id) + ) + + val workflowResponse = upsertWorkflow(workflow)!! + assertNotNull("Workflow creation failed", workflowResponse) + + workflow = randomWorkflowMonitor( + id = workflowResponse.id, + monitorIds = listOf("1", "1", "2") + ) + try { + upsertWorkflow(workflow) + } catch (e: Exception) { + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("Duplicate delegates not allowed") + ) + } + } + } + fun `test create workflow delegate monitor doesn't exist failure`() { - val docQuery1 = DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3") val docLevelInput = DocLevelMonitorInput( - "description", listOf(index), listOf(docQuery1) + "description", listOf(index), listOf(DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")) ) val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) - val customFindingsIndex = "custom_findings_index" - val customFindingsIndexPattern = "custom_findings_index-1" - val customQueryIndex = "custom_alerts_index" val monitor = randomDocumentLevelMonitor( inputs = listOf(docLevelInput), - triggers = listOf(trigger), - dataSources = DataSources( - queryIndex = customQueryIndex, - findingsIndex = customFindingsIndex, - findingsIndexPattern = customFindingsIndexPattern - ) + triggers = listOf(trigger) ) val monitorResponse = createMonitor(monitor)!! @@ -441,6 +600,41 @@ class WorkflowMonitorIT : WorkflowSingleNodeTestCase() { } } + fun `test update workflow delegate monitor doesn't exist failure`() { + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + val monitorResponse = createMonitor(monitor)!! + + var workflow = randomWorkflowMonitor( + monitorIds = listOf(monitorResponse.id) + ) + val workflowResponse = upsertWorkflow(workflow)!! + assertNotNull("Workflow creation failed", workflowResponse) + + workflow = randomWorkflowMonitor( + id = workflowResponse.id, + monitorIds = listOf("-1", monitorResponse.id) + ) + + try { + upsertWorkflow(workflow) + } catch (e: Exception) { + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("are not valid monitor ids") + ) + } + } + } + fun `test create workflow sequence order not correct failure`() { val delegates = listOf( Delegate(1, "monitor-1"), @@ -462,6 +656,45 @@ class WorkflowMonitorIT : WorkflowSingleNodeTestCase() { } } + fun `test update workflow sequence order not correct failure`() { + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + val monitorResponse = createMonitor(monitor)!! + + var workflow = randomWorkflowMonitor( + monitorIds = listOf(monitorResponse.id) + ) + val workflowResponse = upsertWorkflow(workflow)!! + assertNotNull("Workflow creation failed", workflowResponse) + + val delegates = listOf( + Delegate(1, "monitor-1"), + Delegate(1, "monitor-2"), + Delegate(2, "monitor-3") + ) + workflow = randomWorkflowMonitorWithDelegates( + id = workflowResponse.id, + delegates = delegates + ) + try { + upsertWorkflow(workflow) + } catch (e: Exception) { + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("Sequence ordering of delegate monitor shouldn't contain duplicate order values") + ) + } + } + } + fun `test create workflow chained findings monitor not in sequence failure`() { val delegates = listOf( Delegate(1, "monitor-1"), @@ -484,6 +717,46 @@ class WorkflowMonitorIT : WorkflowSingleNodeTestCase() { } } + fun `test update workflow chained findings monitor not in sequence failure`() { + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + val monitorResponse = createMonitor(monitor)!! + + var workflow = randomWorkflowMonitor( + monitorIds = listOf(monitorResponse.id) + ) + val workflowResponse = upsertWorkflow(workflow)!! + assertNotNull("Workflow creation failed", workflowResponse) + + val delegates = listOf( + Delegate(1, "monitor-1"), + Delegate(2, "monitor-2", ChainedFindings("monitor-1")), + Delegate(3, "monitor-3", ChainedFindings("monitor-x")) + ) + workflow = randomWorkflowMonitorWithDelegates( + id = workflowResponse.id, + delegates = delegates + ) + + try { + upsertWorkflow(workflow) + } catch (e: Exception) { + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("Chained Findings Monitor monitor-x doesn't exist in sequence") + ) + } + } + } + fun `test create workflow chained findings order not correct failure`() { val delegates = listOf( Delegate(1, "monitor-1"), @@ -505,4 +778,43 @@ class WorkflowMonitorIT : WorkflowSingleNodeTestCase() { } } } + + fun `test update workflow chained findings order not correct failure`() { + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + val monitorResponse = createMonitor(monitor)!! + + var workflow = randomWorkflowMonitor( + monitorIds = listOf(monitorResponse.id) + ) + val workflowResponse = upsertWorkflow(workflow)!! + assertNotNull("Workflow creation failed", workflowResponse) + + val delegates = listOf( + Delegate(1, "monitor-1"), + Delegate(3, "monitor-2", ChainedFindings("monitor-1")), + Delegate(2, "monitor-3", ChainedFindings("monitor-2")) + ) + workflow = randomWorkflowMonitorWithDelegates( + delegates = delegates + ) + + try { + upsertWorkflow(workflow) + } catch (e: Exception) { + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("Chained Findings Monitor monitor-2 should be executed before monitor monitor-3") + ) + } + } + } } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/transport/WorkflowSingleNodeTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/transport/WorkflowSingleNodeTestCase.kt index 1ced2e2ae..626df7a9f 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/transport/WorkflowSingleNodeTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/transport/WorkflowSingleNodeTestCase.kt @@ -9,6 +9,7 @@ import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope import org.opensearch.action.support.WriteRequest import org.opensearch.common.xcontent.json.JsonXContent import org.opensearch.commons.alerting.action.AlertingActions +import org.opensearch.commons.alerting.action.DeleteWorkflowRequest import org.opensearch.commons.alerting.action.GetWorkflowRequest import org.opensearch.commons.alerting.action.GetWorkflowResponse import org.opensearch.commons.alerting.action.IndexWorkflowRequest @@ -67,4 +68,11 @@ abstract class WorkflowSingleNodeTestCase : AlertingSingleNodeTestCase() { ): GetWorkflowResponse { return client().execute(AlertingActions.GET_WORKFLOW_ACTION_TYPE, GetWorkflowRequest(id, version, RestRequest.Method.GET, fetchSourceContext)).get() } + + protected fun deleteWorkflow(workflowId: String) { + client().execute( + AlertingActions.DELETE_WORKFLOW_ACTION_TYPE, + DeleteWorkflowRequest(workflowId, WriteRequest.RefreshPolicy.IMMEDIATE) + ).get() + } } From 22eb90030ca9fc6e6a477c15c1236a299d5fdd13 Mon Sep 17 00:00:00 2001 From: Stevan Buzejic Date: Mon, 27 Feb 2023 16:24:06 +0100 Subject: [PATCH 4/4] When deleting the monitor, added a check if the monitor is part of the workflow Signed-off-by: Stevan Buzejic --- .../transport/TransportDeleteMonitorAction.kt | 40 +++++++++++++++++-- .../opensearch/alerting/WorkflowMonitorIT.kt | 11 ++--- .../transport/AlertingSingleNodeTestCase.kt | 6 +++ .../transport/WorkflowSingleNodeTestCase.kt | 10 ++++- .../resources/mappings/scheduled-jobs.json | 39 ++++++++++-------- 5 files changed, 80 insertions(+), 26 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt index ab57a0d45..89c0133c7 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt @@ -10,6 +10,7 @@ import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager +import org.apache.lucene.search.join.ScoreMode import org.opensearch.OpenSearchStatusException import org.opensearch.action.ActionListener import org.opensearch.action.ActionRequest @@ -17,6 +18,8 @@ import org.opensearch.action.delete.DeleteRequest import org.opensearch.action.delete.DeleteResponse import org.opensearch.action.get.GetRequest import org.opensearch.action.get.GetResponse +import org.opensearch.action.search.SearchRequest +import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction import org.opensearch.alerting.opensearchapi.suspendUntil @@ -35,6 +38,7 @@ import org.opensearch.commons.alerting.action.DeleteMonitorRequest import org.opensearch.commons.alerting.action.DeleteMonitorResponse import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.ScheduledJob +import org.opensearch.commons.alerting.model.Workflow import org.opensearch.commons.authuser.User import org.opensearch.commons.utils.recreateObject import org.opensearch.index.query.QueryBuilders @@ -42,6 +46,7 @@ import org.opensearch.index.reindex.BulkByScrollResponse import org.opensearch.index.reindex.DeleteByQueryAction import org.opensearch.index.reindex.DeleteByQueryRequestBuilder import org.opensearch.rest.RestStatus +import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.tasks.Task import org.opensearch.transport.TransportService import kotlin.coroutines.resume @@ -95,9 +100,10 @@ class TransportDeleteMonitorAction @Inject constructor( try { val monitor = getMonitor() - val canDelete = user == null || - !doFilterForUser(user) || - checkUserPermissionsWithResource(user, monitor.user, actionListener, "monitor", monitorId) + val canDelete = monitorIsNotInWorkflows(monitor.id) && ( + user == null || !doFilterForUser(user) || + checkUserPermissionsWithResource(user, monitor.user, actionListener, "monitor", monitorId) + ) if (canDelete) { val deleteResponse = deleteMonitor(monitor) @@ -114,6 +120,34 @@ class TransportDeleteMonitorAction @Inject constructor( } } + /** + * Checks if the monitor is part of the workflow + * + * @param monitorId - id of monitor that is checked if it is a workflow delegate + */ + private suspend fun monitorIsNotInWorkflows(monitorId: String): Boolean { + val queryBuilder = QueryBuilders.nestedQuery( + Workflow.WORKFLOW_DELEGATE_PATH, + QueryBuilders.boolQuery().must( + QueryBuilders.matchQuery( + Workflow.WORKFLOW_MONITOR_PATH, + monitorId + ) + ), + ScoreMode.None + ) + + val searchRequest = SearchRequest() + .indices(ScheduledJob.SCHEDULED_JOBS_INDEX) + .source(SearchSourceBuilder().query(queryBuilder).fetchSource(true)) + + val searchResponse: SearchResponse = client.suspendUntil { search(searchRequest, it) } + if (searchResponse.hits.totalHits?.value == 0L) { + return true + } + return false + } + private suspend fun getMonitor(): Monitor { val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, monitorId) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowMonitorIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowMonitorIT.kt index d5def95dc..56aab4537 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowMonitorIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowMonitorIT.kt @@ -406,6 +406,8 @@ class WorkflowMonitorIT : WorkflowSingleNodeTestCase() { val workflowRequest = randomWorkflowMonitor( monitorIds = listOf(monitorResponse.id) ) + + (workflowRequest.inputs.get(0) as CompositeInput).sequence.delegates.get(0).monitorId val workflowResponse = upsertWorkflow(workflowRequest)!! val workflowId = workflowResponse.id val getWorkflowResponse = getWorkflowById(id = workflowResponse.id) @@ -413,15 +415,14 @@ class WorkflowMonitorIT : WorkflowSingleNodeTestCase() { assertNotNull(getWorkflowResponse) assertEquals(workflowId, getWorkflowResponse.id) - deleteWorkflow(workflowId) - // Verify that the workflow is deleted + // Verify that the monitor can't be deleted because it's included in the workflow try { - getWorkflowById(workflowId) + deleteMonitor(monitorResponse.id) } catch (e: Exception) { e.message?.let { assertTrue( - "Exception not returning GetWorkflow Action error ", - it.contains("Workflow not found.") + "Exception not returning DeleteMonitor Action error ", + it.contains("Not allowed to delete this monitor!") ) } } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/transport/AlertingSingleNodeTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/transport/AlertingSingleNodeTestCase.kt index 61e788a32..83ac60912 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/transport/AlertingSingleNodeTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/transport/AlertingSingleNodeTestCase.kt @@ -23,6 +23,8 @@ import org.opensearch.common.unit.TimeValue import org.opensearch.common.xcontent.XContentType import org.opensearch.common.xcontent.json.JsonXContent import org.opensearch.commons.alerting.action.AlertingActions +import org.opensearch.commons.alerting.action.DeleteMonitorRequest +import org.opensearch.commons.alerting.action.DeleteMonitorResponse import org.opensearch.commons.alerting.action.GetFindingsRequest import org.opensearch.commons.alerting.action.GetFindingsResponse import org.opensearch.commons.alerting.action.IndexMonitorRequest @@ -178,6 +180,10 @@ abstract class AlertingSingleNodeTestCase : OpenSearchSingleNodeTestCase() { GetMonitorRequest(monitorId, version, RestRequest.Method.GET, fetchSourceContext) ).get() + protected fun deleteMonitor(monitorId: String): DeleteMonitorResponse = client().execute( + AlertingActions.DELETE_MONITOR_ACTION_TYPE, DeleteMonitorRequest(monitorId, WriteRequest.RefreshPolicy.IMMEDIATE) + ).get() + override fun getPlugins(): List> { return listOf(AlertingPlugin::class.java, ReindexPlugin::class.java) } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/transport/WorkflowSingleNodeTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/transport/WorkflowSingleNodeTestCase.kt index 626df7a9f..d9f36f721 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/transport/WorkflowSingleNodeTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/transport/WorkflowSingleNodeTestCase.kt @@ -7,6 +7,7 @@ package org.opensearch.alerting.transport import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope import org.opensearch.action.support.WriteRequest +import org.opensearch.common.xcontent.XContentParser import org.opensearch.common.xcontent.json.JsonXContent import org.opensearch.commons.alerting.action.AlertingActions import org.opensearch.commons.alerting.action.DeleteWorkflowRequest @@ -44,7 +45,14 @@ abstract class WorkflowSingleNodeTestCase : AlertingSingleNodeTestCase() { return searchResponse.hits.hits.map { it -> val xcp = createParser(JsonXContent.jsonXContent, it.sourceRef).also { it.nextToken() } - Workflow.parse(xcp, it.id, it.version) + lateinit var workflow: Workflow + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + xcp.nextToken() + when (xcp.currentName()) { + "workflow" -> workflow = Workflow.parse(xcp) + } + } + workflow.copy(id = it.id, version = it.version) }.first() } diff --git a/core/src/main/resources/mappings/scheduled-jobs.json b/core/src/main/resources/mappings/scheduled-jobs.json index 29f499ba1..3a94f86ac 100644 --- a/core/src/main/resources/mappings/scheduled-jobs.json +++ b/core/src/main/resources/mappings/scheduled-jobs.json @@ -404,26 +404,18 @@ "inputs": { "type": "nested", "properties": { - "sequence": { + "composite_input": { + "type": "nested", "properties": { - "delegates": { - "type": "nested", + "sequence": { "properties": { - "order": { - "type": "integer" - }, - "monitorId": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256 - } - } - }, - "chainedFindings": { + "delegates": { + "type": "nested", "properties": { - "monitorId": { + "order": { + "type": "integer" + }, + "monitor_id": { "type": "text", "fields": { "keyword": { @@ -431,6 +423,19 @@ "ignore_above": 256 } } + }, + "chained_findings": { + "properties": { + "monitor_id": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } } } }