diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt index 073118dde..bb631a74d 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt @@ -61,7 +61,7 @@ class InputService( val aggTriggerAfterKey: MutableMap = mutableMapOf() // If monitor execution is triggered from a workflow - val indexToDocIds = workflowRunContext?.matchingDocIdsPerIndex + val matchingDocIdsPerIndex = workflowRunContext?.matchingDocIdsPerIndex // TODO: If/when multiple input queries are supported for Bucket-Level Monitor execution, aggTriggerAfterKeys will // need to be updated to account for it @@ -79,8 +79,8 @@ class InputService( val rewrittenQuery = AggregationQueryRewriter.rewriteQuery(deepCopyQuery(input.query), prevResult, monitor.triggers) // Rewrite query to consider the doc ids per given index - if (chainedFindingExist(indexToDocIds)) { - val updatedSourceQuery = updateInputQueryWithFindingDocIds(rewrittenQuery.query(), indexToDocIds!!) + if (chainedFindingExist(matchingDocIdsPerIndex)) { + val updatedSourceQuery = updateInputQueryWithFindingDocIds(rewrittenQuery.query(), matchingDocIdsPerIndex!!) rewrittenQuery.query(updatedSourceQuery) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/WorkflowService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/WorkflowService.kt index e4b34fcce..ae018c843 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/WorkflowService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/WorkflowService.kt @@ -7,8 +7,6 @@ package org.opensearch.alerting import org.apache.logging.log4j.LogManager import org.opensearch.OpenSearchException -import org.opensearch.action.get.GetRequest -import org.opensearch.action.get.GetResponse import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse import org.opensearch.alerting.opensearchapi.suspendUntil @@ -17,19 +15,14 @@ import org.opensearch.client.Client import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.common.xcontent.NamedXContentRegistry import org.opensearch.common.xcontent.XContentFactory -import org.opensearch.common.xcontent.XContentHelper import org.opensearch.common.xcontent.XContentParser import org.opensearch.common.xcontent.XContentParserUtils import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.alerting.model.Finding import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.ScheduledJob -import org.opensearch.index.query.BoolQueryBuilder -import org.opensearch.index.query.MatchQueryBuilder import org.opensearch.index.query.QueryBuilders -import org.opensearch.index.query.TermsQueryBuilder import org.opensearch.search.builder.SearchSourceBuilder -import java.util.stream.Collectors private val log = LogManager.getLogger(WorkflowService::class.java) @@ -134,70 +127,4 @@ class WorkflowService( } return monitors } - - suspend fun getDocIdsPerFindingIndex(monitorId: String, workflowExecutionId: String): Map> { - val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, monitorId) - - val getResponse: GetResponse = client.suspendUntil { - client.get(getRequest, it) - } - - val monitor = if (!getResponse.isSourceEmpty) { - XContentHelper.createParser( - xContentRegistry, LoggingDeprecationHandler.INSTANCE, - getResponse.sourceAsBytesRef, XContentType.JSON - ).use { xcp -> - ScheduledJob.parse(xcp, getResponse.id, getResponse.version) as Monitor - } - } else throw IllegalStateException("Delegate monitors don't exist $monitorId") - // Search findings index per monitor and workflow execution id - val bqb = QueryBuilders.boolQuery().filter(QueryBuilders.termQuery(Finding.MONITOR_ID_FIELD, monitor.id)) - .filter(QueryBuilders.termQuery(Finding.EXECUTION_ID_FIELD, workflowExecutionId)) - val searchRequest = SearchRequest() - .source( - SearchSourceBuilder() - .query(bqb) - .version(true) - .seqNoAndPrimaryTerm(true) - ) - .indices(monitor.dataSources.findingsIndex) - val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) } - - // Get the findings docs - val findings = mutableListOf() - for (hit in searchResponse.hits) { - val xcp = XContentFactory.xContent(XContentType.JSON) - .createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, hit.sourceAsString) - XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) - val finding = Finding.parse(xcp) - findings.add(finding) - } - - val indexToRelatedDocIdsMap = mutableMapOf>() - - for (finding in findings) { - indexToRelatedDocIdsMap.getOrPut(finding.index) { mutableListOf() }.addAll(finding.relatedDocIds) - } - - val toTypedArray = indexToRelatedDocIdsMap.keys.stream().collect(Collectors.toList()).toTypedArray() - val searchFindings = SearchRequest().indices(*toTypedArray) - val queryBuilder = QueryBuilders.boolQuery() - indexToRelatedDocIdsMap.forEach { entry -> - queryBuilder - .should() - .add( - BoolQueryBuilder() - .must(MatchQueryBuilder("_index", entry.key)) - .must(TermsQueryBuilder("_id", entry.value)) - ) - } - searchFindings.source(SearchSourceBuilder().query(queryBuilder)) - val finalQueryResponse: SearchResponse = client.suspendUntil { client.search(searchFindings, it) } - - val indexDocIds = mutableMapOf>() - for (hit in finalQueryResponse.hits) { - indexDocIds.getOrPut(hit.index) { mutableListOf() }.add(hit.id) - } - return indexDocIds - } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/WorkflowMetadata.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/WorkflowMetadata.kt index eabe7470d..c07bcfdb4 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/WorkflowMetadata.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/WorkflowMetadata.kt @@ -13,6 +13,7 @@ import org.opensearch.common.xcontent.XContentBuilder import org.opensearch.common.xcontent.XContentParser import org.opensearch.common.xcontent.XContentParserUtils import org.opensearch.commons.alerting.util.instant +import org.opensearch.commons.alerting.util.optionalTimeField import java.io.IOException import java.time.Instant @@ -46,7 +47,7 @@ data class WorkflowMetadata( if (params.paramAsBoolean("with_type", false)) builder.startObject(METADATA) builder.field(WORKFLOW_ID_FIELD, workflowId) .field(MONITOR_IDS_FIELD, monitorIds) - .field(LATEST_RUN_TIME, latestRunTime) + .optionalTimeField(LATEST_RUN_TIME, latestRunTime) .field(LATEST_EXECUTION_ID, latestExecutionId) if (params.paramAsBoolean("with_type", false)) builder.endObject() return builder.endObject() diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteWorkflowAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteWorkflowAction.kt index a1f01abc0..5d6086a1d 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteWorkflowAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteWorkflowAction.kt @@ -35,7 +35,6 @@ import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.common.xcontent.NamedXContentRegistry import org.opensearch.common.xcontent.XContentHelper import org.opensearch.common.xcontent.XContentParser -import org.opensearch.common.xcontent.XContentParserUtils import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.alerting.AlertingPluginInterface import org.opensearch.commons.alerting.action.AlertingActions @@ -44,12 +43,8 @@ import org.opensearch.commons.alerting.action.DeleteMonitorResponse import org.opensearch.commons.alerting.action.DeleteWorkflowRequest import org.opensearch.commons.alerting.action.DeleteWorkflowResponse import org.opensearch.commons.alerting.model.CompositeInput -import org.opensearch.commons.alerting.model.Schedule import org.opensearch.commons.alerting.model.ScheduledJob import org.opensearch.commons.alerting.model.Workflow -import org.opensearch.commons.alerting.model.WorkflowInput -import org.opensearch.commons.alerting.util.IndexUtils -import org.opensearch.commons.alerting.util.instant import org.opensearch.commons.authuser.User import org.opensearch.commons.utils.recreateObject import org.opensearch.index.IndexNotFoundException @@ -58,11 +53,13 @@ import org.opensearch.rest.RestStatus import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.tasks.Task import org.opensearch.transport.TransportService -import java.time.Instant -import java.util.Locale private val log = LogManager.getLogger(TransportIndexMonitorAction::class.java) +/** + * Transport class that deletes the workflow. + * If the deleteDelegateMonitor flag is set to true, deletes the workflow delegates that are not part of another workflow + */ class TransportDeleteWorkflowAction @Inject constructor( transportService: TransportService, val client: Client, @@ -94,7 +91,14 @@ class TransportDeleteWorkflowAction @Inject constructor( } GlobalScope.launch(Dispatchers.IO + CoroutineName("DeleteWorkflowAction")) { - DeleteWorkflowHandler(client, actionListener, deleteRequest, transformedRequest.deleteUnderlyingMonitors, user, transformedRequest.workflowId).resolveUserAndStart() + DeleteWorkflowHandler( + client, + actionListener, + deleteRequest, + transformedRequest.deleteDelegateMonitors, + user, + transformedRequest.workflowId + ).resolveUserAndStart() } } @@ -102,7 +106,7 @@ class TransportDeleteWorkflowAction @Inject constructor( private val client: Client, private val actionListener: ActionListener, private val deleteRequest: DeleteRequest, - private val deleteUnderlyingMonitors: Boolean?, + private val deleteDelegateMonitors: Boolean?, private val user: User?, private val workflowId: String ) { @@ -122,11 +126,10 @@ class TransportDeleteWorkflowAction @Inject constructor( if (canDelete) { val deleteResponse = deleteWorkflow(workflow) - // TODO - uncomment once the workflow metadata is added - // deleteMetadata(workflow) - if (deleteUnderlyingMonitors == true) { - val underlyingMonitorIds = (workflow.inputs[0] as CompositeInput).getMonitorIds() - val monitorIdsToBeDeleted = monitorsAreNotInDifferentWorkflows(workflowId, underlyingMonitorIds) + deleteMetadata(workflow) + if (deleteDelegateMonitors == true) { + val delegateMonitorIds = (workflow.inputs[0] as CompositeInput).getMonitorIds() + val monitorIdsToBeDeleted = getDeletableDelegates(workflowId, delegateMonitorIds) // Delete the monitor ids if (!monitorIdsToBeDeleted.isNullOrEmpty()) { @@ -169,7 +172,13 @@ class TransportDeleteWorkflowAction @Inject constructor( } } - private suspend fun monitorsAreNotInDifferentWorkflows(workflowIdToBeDeleted: String, monitorIds: List): List { + /** + * Returns lit of monitor ids belonging only to a given workflow + * @param workflowIdToBeDeleted Id of the workflow that should be deleted + * @param monitorIds List of delegate monitor ids (underlying monitor ids) + */ + private suspend fun getDeletableDelegates(workflowIdToBeDeleted: String, monitorIds: List): List { + // Retrieve monitors belonging to another workflows val queryBuilder = QueryBuilders.boolQuery().mustNot(QueryBuilders.termQuery("_id", workflowIdToBeDeleted)).filter( QueryBuilders.nestedQuery( Workflow.WORKFLOW_DELEGATE_PATH, @@ -204,85 +213,10 @@ class TransportDeleteWorkflowAction @Inject constructor( workflow.copy(id = hit.id, version = hit.version) } val workflowMonitors = workflows.filter { it.id != workflowIdToBeDeleted }.flatMap { (it.inputs[0] as CompositeInput).getMonitorIds() }.distinct() - + // Monitors that can be deleted -> all monitors - monitors belonging to another workflows return monitorIds.minus(workflowMonitors.toSet()) } - fun parse(xcp: XContentParser, id: String = Workflow.NO_ID, version: Long = Workflow.NO_VERSION): Workflow { - var name: String? = null - var workflowType: String = Workflow.WorkflowType.COMPOSITE.toString() - var user: User? = null - var schedule: Schedule? = null - var lastUpdateTime: Instant? = null - var enabledTime: Instant? = null - var enabled = true - var schemaVersion = IndexUtils.NO_SCHEMA_VERSION - val inputs: MutableList = mutableListOf() - var owner = "alerting" - - XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) - while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { - val fieldName = xcp.currentName() - xcp.nextToken() - - when (fieldName) { - Workflow.SCHEMA_VERSION_FIELD -> schemaVersion = xcp.intValue() - Workflow.NAME_FIELD -> name = xcp.text() - Workflow.WORKFLOW_TYPE_FIELD -> { - workflowType = xcp.text() - val allowedTypes = Workflow.WorkflowType.values().map { it.value } - if (!allowedTypes.contains(workflowType)) { - throw IllegalStateException("Workflow type should be one of $allowedTypes") - } - } - Workflow.USER_FIELD -> { - user = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else User.parse(xcp) - } - Workflow.ENABLED_FIELD -> enabled = xcp.booleanValue() - Workflow.SCHEDULE_FIELD -> schedule = Schedule.parse(xcp) - Workflow.INPUTS_FIELD -> { - XContentParserUtils.ensureExpectedToken( - XContentParser.Token.START_ARRAY, - xcp.currentToken(), - xcp - ) - while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { - val input = WorkflowInput.parse(xcp) - inputs.add(input) - } - } - Workflow.ENABLED_TIME_FIELD -> enabledTime = xcp.instant() - Workflow.LAST_UPDATE_TIME_FIELD -> lastUpdateTime = xcp.instant() - Workflow.OWNER_FIELD -> { - owner = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) owner else xcp.text() - } - else -> { - xcp.skipChildren() - } - } - } - - if (enabled && enabledTime == null) { - enabledTime = Instant.now() - } else if (!enabled) { - enabledTime = null - } - return Workflow( - id, - version, - requireNotNull(name) { "Workflow name is null" }, - enabled, - requireNotNull(schedule) { "Workflow schedule is null" }, - lastUpdateTime ?: Instant.now(), - enabledTime, - Workflow.WorkflowType.valueOf(workflowType.uppercase(Locale.ROOT)), - user, - schemaVersion, - inputs.toList(), - owner - ) - } - private suspend fun getWorkflow(): Workflow { val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, workflowId) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt index 7d2eb58a3..c183d3125 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt @@ -44,42 +44,57 @@ object CompositeWorkflowRunner : WorkflowRunner() { val isTempMonitor = dryRun || workflow.id == Workflow.NO_ID logger.debug("Workflow ${workflow.id} in $executionId execution is running") + val delegates = (workflow.inputs[0] as CompositeInput).sequence.delegates.sortedBy { it.order } + var monitors: List + try { - val delegates = (workflow.inputs[0] as CompositeInput).sequence.delegates.sortedBy { it.order } - var monitors = monitorCtx.workflowService!!.getMonitorsById(delegates.map { it.monitorId }, delegates.size) - // Validate the monitors size - validateMonitorSize(delegates, monitors, workflow) + monitors = monitorCtx.workflowService!!.getMonitorsById(delegates.map { it.monitorId }, delegates.size) + } catch (e: Exception) { + logger.error("Failed to execute workflow. Error: ${e.message}") + return workflowResult.copy(error = AlertingException.wrap(e)) + } + // Validate the monitors size + validateMonitorSize(delegates, monitors, workflow) - var workflowMetadata = AlertingConfigAccessor.getWorkflowMetadata( - monitorCtx.client!!, - monitorCtx.xContentRegistry!!, - "${workflow.id}-metadata" - ) - if (workflowMetadata == null) { - workflowMetadata = createWorkflowMetadata(workflow.id, delegates.map { it.monitorId }, executionId) - } + var workflowMetadata = AlertingConfigAccessor.getWorkflowMetadata( + monitorCtx.client!!, + monitorCtx.xContentRegistry!!, + "${workflow.id}-metadata" + ) + if (workflowMetadata == null) { + workflowMetadata = createWorkflowMetadata(workflow.id, delegates.map { it.monitorId }, executionId) + } - val monitorsById = monitors.associateBy { it.id } - val resultList = mutableListOf>() + val monitorsById = monitors.associateBy { it.id } + val resultList = mutableListOf>() + var lastErrorDelegateRun: Exception? = null - for (delegate in delegates) { - var indexToDocIds = mapOf>() - var delegateMonitor: Monitor - delegateMonitor = monitorsById[delegate.monitorId] + for (delegate in delegates) { + var indexToDocIds = mapOf>() + var delegateMonitor: Monitor + delegateMonitor = monitorsById[delegate.monitorId] + ?: throw AlertingException.wrap( + IllegalStateException("Delegate monitor not found ${delegate.monitorId} for the workflow $workflow.id") + ) + if (delegate.chainedFindings != null) { + val chainedMonitor = monitorsById[delegate.chainedFindings!!.monitorId] ?: throw AlertingException.wrap( - IllegalStateException("Delegate monitor not found ${delegate.monitorId} for the workflow $workflow.id") + IllegalStateException("Chained finding monitor not found ${delegate.monitorId} for the workflow $workflow.id") ) - if (delegate.chainedFindings != null) { - val chainedMonitor = monitorsById[delegate.chainedFindings!!.monitorId] - ?: throw AlertingException.wrap( - IllegalStateException("Chained finding monitor not found ${delegate.monitorId} for the workflow $workflow.id") - ) + + try { indexToDocIds = monitorCtx.workflowService!!.getFindingDocIdsByExecutionId(chainedMonitor, executionId) + } catch (e: Exception) { + logger.error("Failed to execute workflow. Error: ${e.message}") + return workflowResult.copy(error = AlertingException.wrap(e)) } + } - val workflowRunContext = WorkflowRunContext(workflow.id, delegate.chainedFindings?.monitorId, executionId, indexToDocIds) + val workflowRunContext = WorkflowRunContext(workflow.id, delegate.chainedFindings?.monitorId, executionId, indexToDocIds) - val runResult = if (delegateMonitor.isBucketLevelMonitor()) { + var delegateRunResult: MonitorRunResult<*>? + try { + delegateRunResult = if (delegateMonitor.isBucketLevelMonitor()) { BucketLevelMonitorRunner.runMonitor( delegateMonitor, monitorCtx, @@ -111,24 +126,24 @@ object CompositeWorkflowRunner : WorkflowRunner() { IllegalStateException("Unsupported monitor type") ) } - resultList.add(runResult) + } catch (ex: Exception) { + logger.error("Error executing workflow delegate. Error: ${ex.message}") + lastErrorDelegateRun = AlertingException.wrap(ex) + continue } - - logger.debug("Workflow ${workflow.id} in $executionId finished") - // Update metadata only if the workflow is not temp - if (!isTempMonitor) { - updateWorkflowMetadata( - monitorCtx.client!!, - monitorCtx.settings!!, - workflowMetadata.copy(latestRunTime = workflowExecutionStartTime, latestExecutionId = executionId) - ) - } - - return workflowResult.copy(workflowRunResult = resultList, executionEndTime = Instant.now()) - } catch (e: Exception) { - logger.error("Failed to execute workflow. Error: ${e.message}") - return workflowResult.copy(error = AlertingException.wrap(e)) + if (delegateRunResult != null) resultList.add(delegateRunResult) + } + logger.debug("Workflow ${workflow.id} in $executionId finished") + // Update metadata only if the workflow is not temp + if (!isTempMonitor) { + updateWorkflowMetadata( + monitorCtx.client!!, + monitorCtx.settings!!, + workflowMetadata.copy(latestRunTime = workflowExecutionStartTime, latestExecutionId = executionId) + ) } + + return workflowResult.copy(workflowRunResult = resultList, executionEndTime = Instant.now(), error = lastErrorDelegateRun) } private fun validateMonitorSize( @@ -138,7 +153,9 @@ object CompositeWorkflowRunner : WorkflowRunner() { ) { if (delegates.size != monitors.size) { val diffMonitorIds = delegates.map { it.monitorId }.minus(monitors.map { it.id }.toSet()).joinToString() - throw IllegalStateException("Delegate monitors don't exist $diffMonitorIds for the workflow $workflow.id") + throw AlertingException.wrap( + IllegalStateException("Delegate monitors don't exist $diffMonitorIds for the workflow $workflow.id") + ) } } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowMonitorIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowMonitorIT.kt index 3b7fde74c..5f368ba17 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowMonitorIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowMonitorIT.kt @@ -14,6 +14,9 @@ import org.opensearch.commons.alerting.model.DocLevelMonitorInput import org.opensearch.commons.alerting.model.DocLevelQuery import org.opensearch.commons.alerting.model.Monitor import org.opensearch.rest.RestRequest +import java.time.ZonedDateTime +import java.time.format.DateTimeFormatter +import java.time.temporal.ChronoUnit import java.util.Collections class WorkflowMonitorIT : WorkflowSingleNodeTestCase() { @@ -353,7 +356,7 @@ class WorkflowMonitorIT : WorkflowSingleNodeTestCase() { } } - fun `test delete workflow underlying monitor deleted`() { + fun `test delete workflow delegate monitor deleted`() { val docLevelInput = DocLevelMonitorInput( "description", listOf(index), listOf(DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")) ) @@ -401,7 +404,74 @@ class WorkflowMonitorIT : WorkflowSingleNodeTestCase() { } } - fun `test delete workflow underlying monitor not deleted`() { + fun `test delete executed workflow with metadata deleted`() { + val docQuery1 = DocLevelQuery(query = "test_field_1:\"us-west-2\"", name = "3") + val docLevelInput1 = DocLevelMonitorInput("description", listOf(index), listOf(docQuery1)) + val trigger1 = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + var monitor1 = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput1), + triggers = listOf(trigger1) + ) + val monitorResponse = createMonitor(monitor1)!! + + val docQuery2 = DocLevelQuery(query = "source.ip.v6.v2:16645", name = "4") + val docLevelInput2 = DocLevelMonitorInput("description", listOf(index), listOf(docQuery2)) + val trigger2 = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + var monitor2 = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput2), + triggers = listOf(trigger2), + ) + + val monitorResponse2 = createMonitor(monitor2)!! + + var workflow = randomWorkflowMonitor( + monitorIds = listOf(monitorResponse.id, monitorResponse2.id) + ) + val workflowResponse = upsertWorkflow(workflow)!! + val workflowById = searchWorkflow(workflowResponse.id) + assertNotNull(workflowById) + + var testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS)) + // Matches monitor1 + val testDoc1 = """{ + "message" : "This is an error from IAD region", + "source.ip.v6.v2" : 16644, + "test_strict_date_time" : "$testTime", + "test_field_1" : "us-west-2" + }""" + indexDoc(index, "1", testDoc1) + + val workflowId = workflowResponse.id + val executeWorkflowResponse = executeWorkflow(workflowById, workflowId, false)!! + val monitorsRunResults = executeWorkflowResponse.workflowRunResult.workflowRunResult + assertEquals(2, monitorsRunResults.size) + + deleteWorkflow(workflowId, true) + // Verify that the workflow is deleted + try { + getWorkflowById(workflowId) + } catch (e: Exception) { + e.message?.let { + assertTrue( + "Exception not returning GetWorkflow Action error ", + it.contains("Workflow not found.") + ) + } + } + // Verify that the workflow metadata is deleted + try { + searchWorkflowMetadata(workflowId) + } catch (e: Exception) { + e.message?.let { + assertTrue( + "Exception not returning GetMonitor Action error ", + it.contains("List is empty") + ) + } + } + } + + fun `test delete workflow delegate monitor not deleted`() { val docLevelInput = DocLevelMonitorInput( "description", listOf(index), listOf(DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")) ) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowRunnerIT.kt index 18ad37928..8fd2f94ad 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowRunnerIT.kt @@ -74,7 +74,7 @@ class WorkflowRunnerIT : WorkflowSingleNodeTestCase() { monitorIds = listOf(monitorResponse.id, monitorResponse2.id) ) val workflowResponse = upsertWorkflow(workflow)!! - val workflowById = searchWorkflow(workflowResponse.id)!! + val workflowById = searchWorkflow(workflowResponse.id) assertNotNull(workflowById) var testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS)) @@ -125,6 +125,69 @@ class WorkflowRunnerIT : WorkflowSingleNodeTestCase() { assertFindings(monitorResponse2.id, customFindingsIndex2, 1, 1, listOf("2")) } + fun `test execute workflow verify workflow metadata`() { + val docQuery1 = DocLevelQuery(query = "test_field_1:\"us-west-2\"", name = "3") + val docLevelInput1 = DocLevelMonitorInput("description", listOf(index), listOf(docQuery1)) + val trigger1 = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + var monitor1 = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput1), + triggers = listOf(trigger1) + ) + val monitorResponse = createMonitor(monitor1)!! + + val docQuery2 = DocLevelQuery(query = "source.ip.v6.v2:16645", name = "4") + val docLevelInput2 = DocLevelMonitorInput("description", listOf(index), listOf(docQuery2)) + val trigger2 = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + var monitor2 = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput2), + triggers = listOf(trigger2), + ) + + val monitorResponse2 = createMonitor(monitor2)!! + + var workflow = randomWorkflowMonitor( + monitorIds = listOf(monitorResponse.id, monitorResponse2.id) + ) + val workflowResponse = upsertWorkflow(workflow)!! + val workflowById = searchWorkflow(workflowResponse.id) + assertNotNull(workflowById) + + var testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS)) + // Matches monitor1 + val testDoc1 = """{ + "message" : "This is an error from IAD region", + "source.ip.v6.v2" : 16644, + "test_strict_date_time" : "$testTime", + "test_field_1" : "us-west-2" + }""" + indexDoc(index, "1", testDoc1) + + val workflowId = workflowResponse.id + val executeWorkflowResponse = executeWorkflow(workflowById, workflowId, false)!! + val monitorsRunResults = executeWorkflowResponse.workflowRunResult.workflowRunResult + assertEquals(2, monitorsRunResults.size) + + val workflowMetadata = searchWorkflowMetadata(id = workflowId) + assertNotNull("Workflow metadata not initialized", workflowMetadata) + assertEquals( + "Workflow metadata execution id not correct", + executeWorkflowResponse.workflowRunResult.executionId, + workflowMetadata!!.latestExecutionId + ) + + val executeWorkflowResponse1 = executeWorkflow(workflowById, workflowId, false)!! + val monitorsRunResults1 = executeWorkflowResponse1.workflowRunResult.workflowRunResult + assertEquals(2, monitorsRunResults1.size) + + val workflowMetadata1 = searchWorkflowMetadata(id = workflowId) + assertNotNull("Workflow metadata not initialized", workflowMetadata) + assertEquals( + "Workflow metadata execution id not correct", + executeWorkflowResponse1.workflowRunResult.executionId, + workflowMetadata1!!.latestExecutionId + ) + } + fun `test execute workflow with custom alerts and finding index with bucket level doc level delegates when bucket level delegate is used in chained finding`() { val query = QueryBuilders.rangeQuery("test_strict_date_time") .gt("{{period_end}}||-10d") @@ -192,7 +255,7 @@ class WorkflowRunnerIT : WorkflowSingleNodeTestCase() { monitorIds = listOf(bucketLevelMonitorResponse.id, docLevelMonitorResponse.id) ) val workflowResponse = upsertWorkflow(workflow)!! - val workflowById = searchWorkflow(workflowResponse.id)!! + val workflowById = searchWorkflow(workflowResponse.id) assertNotNull(workflowById) // Creates 5 documents @@ -329,7 +392,7 @@ class WorkflowRunnerIT : WorkflowSingleNodeTestCase() { monitorIds = listOf(docLevelMonitorResponse.id, bucketLevelMonitorResponse.id, docLevelMonitorResponse1.id, queryMonitorResponse.id) ) val workflowResponse = upsertWorkflow(workflow)!! - val workflowById = searchWorkflow(workflowResponse.id)!! + val workflowById = searchWorkflow(workflowResponse.id) assertNotNull(workflowById) // Creates 5 documents @@ -423,12 +486,12 @@ class WorkflowRunnerIT : WorkflowSingleNodeTestCase() { monitorIds = listOf(monitorResponse.id) ) val workflowResponse = upsertWorkflow(workflow)!! - val workflowById = searchWorkflow(workflowResponse.id)!! + val workflowById = searchWorkflow(workflowResponse.id) assertNotNull(workflowById) deleteIndex(index) - val response = executeWorkflow(workflowById, workflowById.id, false)!! + val response = executeWorkflow(workflowById, workflowById!!.id, false)!! assertNotNull(response.workflowRunResult.error) assertTrue(response.workflowRunResult.error is AlertingException) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/transport/WorkflowSingleNodeTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/transport/WorkflowSingleNodeTestCase.kt index 23505d69b..21af929cb 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/transport/WorkflowSingleNodeTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/transport/WorkflowSingleNodeTestCase.kt @@ -10,6 +10,7 @@ import org.opensearch.action.support.WriteRequest import org.opensearch.alerting.action.ExecuteWorkflowAction import org.opensearch.alerting.action.ExecuteWorkflowRequest import org.opensearch.alerting.action.ExecuteWorkflowResponse +import org.opensearch.alerting.model.WorkflowMetadata import org.opensearch.common.unit.TimeValue import org.opensearch.common.xcontent.XContentParser import org.opensearch.common.xcontent.json.JsonXContent @@ -65,6 +66,35 @@ abstract class WorkflowSingleNodeTestCase : AlertingSingleNodeTestCase() { }.first() } + protected fun searchWorkflowMetadata( + id: String, + indices: String = ScheduledJob.SCHEDULED_JOBS_INDEX, + refresh: Boolean = true, + ): WorkflowMetadata? { + try { + if (refresh) refreshIndex(indices) + } catch (e: Exception) { + logger.warn("Could not refresh index $indices because: ${e.message}") + return null + } + val ssb = SearchSourceBuilder() + ssb.version(true) + ssb.query(TermQueryBuilder("workflow_metadata.workflow_id", id)) + val searchResponse = client().prepareSearch(indices).setRouting(id).setSource(ssb).get() + + return searchResponse.hits.hits.map { it -> + val xcp = createParser(JsonXContent.jsonXContent, it.sourceRef).also { it.nextToken() } + lateinit var workflowMetadata: WorkflowMetadata + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + xcp.nextToken() + when (xcp.currentName()) { + "workflow_metadata" -> workflowMetadata = WorkflowMetadata.parse(xcp) + } + } + workflowMetadata.copy(id = it.id) + }.first() + } + protected fun upsertWorkflow( workflow: Workflow, id: String = Workflow.NO_ID, @@ -93,10 +123,10 @@ abstract class WorkflowSingleNodeTestCase : AlertingSingleNodeTestCase() { ).get() } - protected fun deleteWorkflow(workflowId: String, deleteUnderlyingMonitors: Boolean? = null) { + protected fun deleteWorkflow(workflowId: String, deleteDelegateMonitors: Boolean? = null) { client().execute( AlertingActions.DELETE_WORKFLOW_ACTION_TYPE, - DeleteWorkflowRequest(workflowId, deleteUnderlyingMonitors, WriteRequest.RefreshPolicy.IMMEDIATE) + DeleteWorkflowRequest(workflowId, deleteDelegateMonitors, WriteRequest.RefreshPolicy.IMMEDIATE) ).get() } diff --git a/core/src/main/resources/mappings/scheduled-jobs.json b/core/src/main/resources/mappings/scheduled-jobs.json index 2babe37f4..630f99cfd 100644 --- a/core/src/main/resources/mappings/scheduled-jobs.json +++ b/core/src/main/resources/mappings/scheduled-jobs.json @@ -841,22 +841,6 @@ }, "latest_execution_id": { "type": "keyword" - }, - "last_action_execution_times": { - "type": "nested", - "properties": { - "action_id": { - "type": "keyword" - }, - "execution_time": { - "type": "date", - "format": "strict_date_time||epoch_millis" - } - } - }, - "last_run_context": { - "type": "object", - "enabled": false } } }